From 742f80ee5c706044ecc5746813eb4d8e0c54acea Mon Sep 17 00:00:00 2001 From: youthlql <1826692270@qq.com> Date: Tue, 9 Nov 2021 23:35:20 +0800 Subject: [PATCH] =?UTF-8?q?dubbo=E6=BA=90=E7=A0=81=E6=9B=B4=E6=96=B0?= =?UTF-8?q?=E4=B8=A4=E7=AF=87=E6=96=87=E7=AB=A0-dubbo=E5=AE=8C=E7=BB=93?= =?UTF-8?q?=EF=BC=8C=E4=B8=8B=E6=AC=A1=E9=A2=84=E8=AE=A1=E6=9B=B4=E6=96=B0?= =?UTF-8?q?Spring=E6=BA=90=E7=A0=81=E7=B3=BB=E5=88=97=E6=96=87=E7=AB=A0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- README.md | 54 +- ...源码系列V1-Dubbo第六节-服务引入源码解析.md | 1420 +++++++++++++++++ ...源码系列V1-Dubbo第七节-服务调用源码解析.md | 572 +++++++ 3 files changed, 2007 insertions(+), 39 deletions(-) create mode 100644 docs/rpc/dubbo/06.Dubbo源码系列V1-Dubbo第六节-服务引入源码解析.md create mode 100644 docs/rpc/dubbo/07.Dubbo源码系列V1-Dubbo第七节-服务调用源码解析.md diff --git a/README.md b/README.md index 01ac917..249d03b 100644 --- a/README.md +++ b/README.md @@ -18,39 +18,11 @@ - - -# 目录 - -- [Java](#java) - - - [基础](#基础) - - [容器](#容器) - - [并发](#并发) - - [JVM](#JVM) - - [各版本新特性](#各版本新特性) - - - -- [计算机网络](#计算机网络) - - - -- [ElasticSearch](#ElasticSearch) - -# 随笔 - -[我的校招-不完全知识点整理](docs/suibi/我的校招-不完全知识点整理.md) - # Java ## 基础 -1、总结【TODO】 - - - -**重难点** +### 重难点 1、[泛型详解【万字长文】](docs/Java/Basis/keyAndDifficultPoints/Generic/泛型.md) @@ -58,7 +30,7 @@ ## 容器 -**HashMap** +### HashMap [HashMap-JDK7源码讲解](docs/Java/collection/HashMap-JDK7源码讲解.md) @@ -66,10 +38,6 @@ -**ConcurrentHashMap源码讲解(JDK7和JDK8)【TODO】** - - - ## 并发 > 这个系列基本全是万字长文,希望读者可以耐心看下去,相信会有很大收获。 @@ -90,13 +58,11 @@ -AQS剩余部分,以及阻塞队列源码暂时先搁置一下。 - ## JVM -**1、内存与垃圾回收篇** +### 内存与垃圾回收篇 1、[JVM系列-第1章-JVM与Java体系结构](docs/Java/JVM/JVM系列-第1章-JVM与Java体系结构.md) @@ -126,7 +92,7 @@ AQS剩余部分,以及阻塞队列源码暂时先搁置一下。 -## 各版本新特性 +## JDK新特性 1、[Java8新特性](docs/Java/Basis/Java8_New_Features/Java8新特性.md) @@ -150,7 +116,7 @@ AQS剩余部分,以及阻塞队列源码暂时先搁置一下。 -# 设计模式【8.2更新基本完毕】 +# 设计模式 [1.设计模式-设计思想](docs/design_patterns/design_ideas/设计模式-01.设计思想.md) @@ -182,6 +148,10 @@ AQS剩余部分,以及阻塞队列源码暂时先搁置一下。 [Spring常用注解](docs/spring/use/Spring常用注解.md) + + +## 源码【预计11月或者12月更新】 + # Netty ## 入门 @@ -206,6 +176,12 @@ AQS剩余部分,以及阻塞队列源码暂时先搁置一下。 4. [Dubbo服务导出源码解析](docs/rpc/dubbo/05.Dubbo源码系列V1-Dubbo第五节-服务导出源码解析.md) +5. [Dubbo服务引入源码解析](docs/rpc/dubbo/06.Dubbo源码系列V1-Dubbo第六节-服务引入源码解析.md) + +6. [Dubbo服务调用源码解析](docs/rpc/dubbo/07.Dubbo源码系列V1-Dubbo第七节-服务调用源码解析.md) + + + # Apollo diff --git a/docs/rpc/dubbo/06.Dubbo源码系列V1-Dubbo第六节-服务引入源码解析.md b/docs/rpc/dubbo/06.Dubbo源码系列V1-Dubbo第六节-服务引入源码解析.md new file mode 100644 index 0000000..4bcb0e2 --- /dev/null +++ b/docs/rpc/dubbo/06.Dubbo源码系列V1-Dubbo第六节-服务引入源码解析.md @@ -0,0 +1,1420 @@ +--- +title: 06.Dubbo源码系列V1-Dubbo第六节-服务引入源码解析 +tags: + - Dubbo + - rpc +categories: + - rpc + - Dubbo源码系列v1 +keywords: Dubbo,rpc +description: 服务引入源码解析 +cover: 'https://cdn.jsdelivr.net/gh/youthlql/youthlql/img/dubbo.png' +abbrlink: bda15919 +date: 2021-11-08 14:11:58 +--- + + + +## 第六节: Dubbo服务引入源码解析 + +### 过程 + + + +当Spring启动过程中,会去给@Reference注解标注了的属性去进行赋值,赋值的对象为ReferenceBean中get()方法所返回的对象,这个对象是一个代理对象。 + + + +对于ReferenceBean,它表示应用想要引入的服务的信息,在执行get()时会做如下几步: + +1. 调用checkAndUpdateSubConfigs(),检查和更新参数,和服务提供者类似,把ReferenceBean里的属性的值更新为优先级最高的参数值 +2. 调用init()去生成代理对象ref,get()方法会返回这个ref +3. 在生成代理对象ref之前,先把消费者所引入服务设置的参数添加到一个map中,等会根据这个map中的参数去从注册中心查找服务 +4. 把消费者配置的所有注册中心获取出来 + + 1. 如果只有一个注册中心,那么直接调用Protocol的refer(interfaceClass, urls.get(0));得到一个Invoker对象 + 2. 如果有多个注册中心,则遍历每个注册中心,分别调用Protocol的refer(interfaceClass, url);得到一个Invoker对象添加到invokers中,然后把invokers调用CLUSTER.join(new StaticDirectory(u, invokers));封装所有invokers得到一个invoker, + +5. 把最终得到的invoker对象调用PROXY_FACTORY.getProxy(invoker);得到一个代理对象,并返回,这个代理对象就是ref +6. 总结:上文的Invoker对象,表示服务执行者,从注册中心refer下来的是一个服务执行者,合并invokers后得到的invoker也是一个服务执行者(抽象范围更大了) + + + + + +接下来,来看Protorol.refer(interfaceClass, url)方法是怎么生成一个Invoker的 + +1. 首先interfaceClass表示要引入的服务接口,url是注册中心的url(registry://),该url中有一个refer参数,参数值为当前所要引入服务的参数 +2. 调用doRefer(**cluster**, registry, type, url) +3. 在doRefer方法中会生成一个RegistryDirectory +4. 然后获取新版本中的路由器链,并添加到RegistryDirectory中去 +5. RegistryDirectory监听几个目录(注意,完成监听器的订阅绑定后,**会自动触发一次去获取这些目录上的当前数据**) + + 1. 当前所引入的服务的动态配置目录:/dubbo/config/dubbo/org.apache.dubbo.demo.DemoService:1.1.1:g1.configurators + 2. 当前所引入的服务的提供者目录:/dubbo/org.apache.dubbo.demo.DemoService/providers + 3. 当前所引入的服务的老版本动态配置目录:/dubbo/org.apache.dubbo.demo.DemoService/configurators + 4. 当前所引入的服务的老版本路由器目录:/dubbo/org.apache.dubbo.demo.DemoService/routers + +6. 调用cluster.join(directory)得到一个invoker +7. 返回invoker(如果消费者引入了多个group中的服务,那么这里返回的是new MergeableClusterInvoker(directory);,否则返回的是new FailoverClusterInvoker(directory);) +8. 但是,上面返回的两个Invoker都会被MockClusterInvoker包装,所以最终返回的是MockClusterInvoker。 + + + +### Dubbo官方给的Demo + +```java +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.dubbo.demo.consumer.comp; + +import org.apache.dubbo.config.annotation.Reference; +import org.apache.dubbo.demo.DemoService; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +@Component("demoServiceComponent") +public class DemoServiceComponent implements DemoService { + + @Reference(version = "1.0.1", group = "youthlql", mock = "fail: return 123") + private DemoService demoService; + + @Override + public String sayHello(String name) { + return demoService.sayHello(name); // Invoker + } +} +``` + + + + + +```java +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.dubbo.demo.consumer; + +import org.apache.dubbo.config.ConsumerConfig; +import org.apache.dubbo.config.ReferenceConfig; +import org.apache.dubbo.config.spring.context.annotation.EnableDubbo; +import org.apache.dubbo.demo.DemoService; +import org.apache.dubbo.demo.consumer.comp.DemoServiceComponent; +import org.apache.dubbo.rpc.service.EchoService; +import org.springframework.context.annotation.*; + +import java.io.IOException; + +public class Application { + /** + * In order to make sure multicast registry works, need to specify '-Djava.net.preferIPv4Stack=true' before + * launch the application + */ + public static void main(String[] args) throws IOException { + AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(ConsumerConfiguration.class); + context.start(); + DemoService service = context.getBean("demoServiceComponent", DemoServiceComponent.class); + System.out.println("开始调用"); + String hello = service.sayHello("world"); + System.out.println("result :" + hello); + + + System.in.read(); + } + + @Configuration + @EnableDubbo(scanBasePackages = "org.apache.dubbo.demo.consumer.comp") + @PropertySource("classpath:/spring/dubbo-consumer.properties") + @ComponentScan(value = {"org.apache.dubbo.demo.consumer.comp"}) + static class ConsumerConfiguration { + + @Bean + public ConsumerConfig consumerConfig() { + return new ConsumerConfig(); + } + + } +} +``` + +### 服务引入过程概述 + +#### 什么是服务引入 + +1. 首先服务引入的意思:在Spring启动的时候去扫描那个@Reference注解标注的Bean,把它注入到容器中,并且去构建一下服务真正调用时候的一些必要东西,比如调用哪个IP哪个端口的哪个服务实例,这样的东西叫做服务目录。 +2. 注意服务目录是需要提前构建到本地,不能等你调用的时候再去注册中心获取,这样会很慢。从这个角度来说,服务目录相当于把注册中心上关于服务提供者的信息提前缓存到本地,只不过做了一些转化。 +3. 总的来说,服务引入的目的就是构建服务目录 +4. 每一个服务提供者的接口都有一个服务目录,比如我们用@Reference标记了DemoService和MyService两个服务提供者,那么这两个服务提供者都有自己的服务目录 + +消费端每个服务对应一个服务目录RegistryDirectory。 + + + +一个服务目录中包含了: + +1. serviceType:表示服务接口 +2. serviceKey:表示引入的服务key,serviceclass+version+group +3. queryMap:表示引入的服务的参数配置 +4. configurators:动态配置 +5. routerChain:路由链 +6. invokers:表示服务目录当前缓存的服务提供者Invoker +7. ConsumerConfigurationListener:监听本应用的动态配置 +8. ReferenceConfigurationListener:监听所引入的服务的动态配置 + +#### 大致的步骤 + +1. 解析@Reference注解上的一些配置,生成URL +2. 监听注册中心并拉取注册中心上相关的配置。 + 1. 服务启动时的-D参数,并且从注册中心上拉取web端的动态配置。这些参数配置根据优先级顺序覆盖掉注册中心上服务提供者的配置。优先级的话前面也讲过web端的动态配置 > -D参数 > @Reference注解 > @Services生成的提供者URL。 + 2. 注册中心上也包括了路由链的配置 + 3. 监听。首先服务目录就是把注册中心上的信息缓存到了本地,所以如果注册中心上信息变了,本地的服务目录也要随之更新,也就是需要监听。 +3. 把上面几步整合的URL转成`List`。`List`才是真正要用的 + + + + + +#### 监听 + +在服务消费端有几个监听器: + +1. ConsumerConfigurationListener:监听本应用的动态配置,当应用的动态配置发生了修改后,会调用RegistryDirectory的refreshInvoker()方法,对应的路径为:**"/dubbo/config/dubbo/dubbo-demo-consumer-application.configurators"** +2. ReferenceConfigurationListener:监听所引入的服务的动态配置,当服务的动态配置发生了修改后,会调用RegistryDirectory的refreshInvoker()方法,对应的路径为:**"/dubbo/config/dubbo/org.apache.dubbo.demo.DemoService:1.1.1:g1.configurators"** +3. RegistryDirectory:RegistryDirectory本身也是一个监听器,它会监听所引入的服务提供者、服务动态配置(老版本)、服务路由,路径分别为: + + 1. **"/dubbo/org.apache.dubbo.demo.DemoService/providers"** + 2. **"/dubbo/org.apache.dubbo.demo.DemoService/configurators"** + 3. **"/dubbo/org.apache.dubbo.demo.DemoService/routers"** + +4. 路由器Router:每个Router自己本身也是一个监听器,负责监听对应的路径 + + 1. AppRouter:应用路由,监听的路径为**"/dubbo/config/dubbo/dubbo-demo-consumer-application.condition-router"** + 2. ServiceRouter: 服务路由,监听的路径为**"/dubbo/config/dubbo/org.apache.dubbo.demo.DemoService:1.1.1:g1.condition-router"** + 3. TagRouter: 标签路由,标签路由和应用路由、服务路由有所区别,应用路由和服务路由都是在消费者启动,在构造路由链时会进行监听器的绑定,但是标签路由不是消费者启动的时候绑定监听器的,是在引入服务时,获取到服务的提供者URL之后,才会去监听.tag-router节点中的内容,监听的路径为**"/dubbo/config/dubbo/dubbo-demo-provider-application.tag-router"** + +5. 当ConsumerConfigurationListener接收到了消费者应用的动态配置数据变化后,会调用当前消费者应用中的所有RegistryDirectory的refreshInvoker()方法,表示刷新消费者应用中引入的每个服务对应的Invoker +6. 当ReferenceConfigurationListener接收到了某个服务的动态配置数据变化后,会调用该服务对应的RegistryDirectory的refreshInvoker()方法,表示刷新该服务对应的Invoker +7. 当AppRouter和ServiceRouter接收到条件路由的数据变化后,就会更新Router内部的routerRule和conditionRouters属性。这两个属性在服务调用过程中会用到。 +8. 当TagRouter接收到标签路由的数据变化后,就会更新TagRouter内部的tagRouterRule的属性,这个属性在服务调用过程中会用到。 +9. 当RegistryDirectory接收到"/dubbo/org.apache.dubbo.demo.DemoService/configurators"节点数据变化后,会生成**configurators** +10. 当RegistryDirectory接收到"/dubbo/org.apache.dubbo.demo.DemoService/**routers**"节点数据变化后,会生成Router并添加到**routerChain中** +11. 当RegistryDirectory接收到"/dubbo/org.apache.dubbo.demo.DemoService/**providers**"节点数据变化后,会调用refreshOverrideAndInvoker()方法。这个方法就是用来针对每个服务提供者来生成Invoker的。 + 1. refreshOverrideAndInvoker方法中首先调用overrideDirectoryUrl()方法利用Configurators重写目录地址,目录地址是这样的:`zookeeper://127.0.0.1:2181/org.apache.dubbo.registry.RegistryService?application=dubbo-demo-consumer-application&dubbo=2.0.2&group=g1&interface=org.apache.dubbo.demo.DemoService&lazy=false&methods=sayHello&pid=49964®ister.ip=192.168.40.17&release=2.7.0&revision=1.1.1&side=consumer&sticky=false×tamp=1591339005022&version=1.1.1`,在注册中心URL基础上把当前引入服务的参数作为URL的Parameters,所以这个地址既包括了注册中心的信息,也包括了当前引入服务的信息 + 2. 利用老版本configurators,Consumer应用的configurators,引入的服务的configurators去重写目录地址。 + 3. 重写往目录地址后,调用refreshInvoker(urls)方法去刷新Invoker + 4. 在refreshInvoker(urls)方法中会把从注册中心获取到的providers节点下的服务URL,调用toInvokers(invokerUrls)方法得到Invoker + 5. 先按Protocol进行过滤,并且调用DubboProtocol.refer方法得到Invoker + 6. 将得到的invokers设置到RouterChain上,并且调用RouterChain上所有的**routers的**notify(**invokers**)方法,实际上这里只有TagRouter的notify方法有用 + 7. 再把属于同一个group中的invoker合并起来 + 8. 这样Invoker就生成好了 + + + + + +### 源码分析-解析@Reference注解上的配置 + +#### ReferenceBean + +```java + /** + * 1.这里是入口方法,Spring容器在启动时就会先生成ReferenceBean对象,接着会调用{@link ReferenceConfig#get()} + * 2.意思就是当你标注了注解@Reference,Spring启动扫描bean的时候会调用此类重写的 + * {@link ReferenceBean#afterPropertiesSet()},最终会调到get()返回一个代理对象给那个bean + * 3.也就是说在调用此方法之前,ReferenceBean对象就已经生成了,里面的属性值也都有了。 + */ + @Override + public Object getObject() { + // 这里调用ReferenceConfig#get() + return get(); + } + + public void afterPropertiesSet() throws Exception { + //前面代码太长了,省略... + + if (shouldInit()) { + getObject(); + } +``` + +#### ReferenceConfig + +```java + public synchronized T get() { + checkAndUpdateSubConfigs(); + + if (destroyed) { + throw new IllegalStateException("The invoker of ReferenceConfig(" + url + ") has already destroyed!"); + } + /** + * 前面讲过{@link ReferenceConfig#ref}属性就是代理对象 + */ + if (ref == null) { + // 入口,这里就是核心了 + init(); + } + return ref; // 返回的是Invoke代理 + } + + private void init() { + if (initialized) { + return; + } + //代码太长省略了,主要就是跟服务导出一样,准备参数。比如@Reference注解里的属性还有properties的配置参数等 + + // 关键 + ref = createProxy(map); + + String serviceKey = URL.buildKey(interfaceName, group, version); + ApplicationModel.initConsumerModel(serviceKey, buildConsumerModel(serviceKey, attributes)); + initialized = true; + } + + private T createProxy(Map map) { + if (shouldJvmRefer(map)) { + // injvm:// + URL url = new URL(LOCAL_PROTOCOL, LOCALHOST_VALUE, 0, interfaceClass.getName()).addParameters(map); + invoker = REF_PROTOCOL.refer(interfaceClass, url); + if (logger.isInfoEnabled()) { + logger.info("Using injvm service " + interfaceClass.getName()); + } + } else { + + // 为什么会有urls,因为可以在@Reference的url属性中配置多个url,可以是点对点的服务地址,也可以是注册中心的地址 + urls.clear(); // reference retry init will add url to urls, lead to OOM + // @Reference中指定了url属性, + // @Reference上标注的属性是在Spring启动时就扫描进了相应的属性里,所以这里如果配置了就会有 + // 前面讲过扫描到这些东西赋值给对应的属性 + if (url != null && url.length() > 0) { // user specified URL, could be peer-to-peer address, or register center's address. + String[] us = SEMICOLON_SPLIT_PATTERN.split(url); // 用;号切分 + if (us != null && us.length > 0) { + for (String u : us) { + URL url = URL.valueOf(u); + if (StringUtils.isEmpty(url.getPath())) { + url = url.setPath(interfaceName); + } + + // 如果是注册中心地址,则在url中添加一个refer参数 + if (REGISTRY_PROTOCOL.equals(url.getProtocol())) { + // map表示消费者端配置的参数 + urls.add(url.addParameterAndEncoded(REFER_KEY, StringUtils.toQueryString(map))); + } else { + // 如果是服务地址 + // 有可能url中配置了参数,map中表示的服务消费者消费服务时的参数,所以需要合并 + urls.add(ClusterUtils.mergeUrl(url, map)); + } + } + } + } else { // assemble URL from register center's configuration + // @Reference中的protocol属性表示使用哪个协议调用服务,如果不是本地调用协议injvm://,则把注册中心地址找出来 + // 对于injvm://协议已经在之前的逻辑中就已经生成invoke了 + // if protocols not injvm checkRegistry + if (!LOCAL_PROTOCOL.equalsIgnoreCase(getProtocol())) { + checkRegistry(); + // 加载注册中心地址 + List us = loadRegistries(false); + if (CollectionUtils.isNotEmpty(us)) { + for (URL u : us) { + URL monitorUrl = loadMonitor(u); + if (monitorUrl != null) { + map.put(MONITOR_KEY, URL.encode(monitorUrl.toFullString())); + } + // 对于注册中心地址都添加REFER_KEY + urls.add(u.addParameterAndEncoded(REFER_KEY, StringUtils.toQueryString(map))); + } + } + if (urls.isEmpty()) { + throw new IllegalStateException("No such any registry to reference " + interfaceName + " on the consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion() + ", please config to your spring config."); + } + } + } + + /** + * 1.如果URL是registry,表明这是注册中心相关的调用进来的,一般来说注册中心只有一个, + * 就会进入这个方法 + * 2.下面还是会用到SPI机制,就是首先调用Protocol接口的两个wrapper包装。debug时候的顺序是 + * ProtocolListenerWrapper -> ProtocolFilterWrapper + * 3.然后根据SPI机制,由于URL是Registry,就调用RegistryProtocol.refer() + */ + if (urls.size() == 1) { + // RegistryProtocol.refer() 或者 DubboProtocol.refer() + invoker = REF_PROTOCOL.refer(interfaceClass, urls.get(0)); + // MockClusterInvoker-->FailoverClusterInvoker-->RegistryDirectory + // --->RegistryDirectory$InvokerDelegate-->ListenerInvokerWrapper-->ProtocolFilterWrapper$CallbackRegistrationInvoker-->ConsumerContextFilter-->FutureFilter-->MonitorFilter-->AsyncToSyncInvoker-->DubboInvoker + // --->RegistryDirectory$InvokerDelegate-->ListenerInvokerWrapper-->ProtocolFilterWrapper$CallbackRegistrationInvoker-->ConsumerContextFilter-->FutureFilter-->MonitorFilter-->AsyncToSyncInvoker-->DubboInvoker + } else { + // 如果有多个url + // 1. 根据每个url,refer得到对应的invoker + // 2. 如果这多个urls中存在注册中心url,则把所有invoker整合为RegistryAwareClusterInvoker,该Invoker在调用时, + // 会查看所有Invoker中是否有默认的,如果有则使用默认的Invoker,如果没有,则使用第一个Invoker + // 3. 如果这多个urls中不存在注册中心url,则把所有invoker整合为FailoverCluster + + List> invokers = new ArrayList>(); + URL registryURL = null; // 用来记录urls中最后一个注册中心url + for (URL url : urls) { + invokers.add(REF_PROTOCOL.refer(interfaceClass, url)); + + if (REGISTRY_PROTOCOL.equals(url.getProtocol())) { + registryURL = url; // use last registry url + } + } + + // 如果存在注册中心地址 + if (registryURL != null) { // registry url is available + // use RegistryAwareCluster only when register's CLUSTER is available + URL u = registryURL.addParameter(CLUSTER_KEY, RegistryAwareCluster.NAME); + // StaticDirectory表示静态服务目录,里面的invokers是不会变的, 生成一个RegistryAwareCluster + // The invoker wrap relation would be: RegistryAwareClusterInvoker(StaticDirectory) -> FailoverClusterInvoker(RegistryDirectory, will execute route) -> Invoker + /** + * + * {@link org.apache.dubbo.rpc.cluster.support.wrapper.MockClusterWrapper#join(Directory)} + * {@link org.apache.dubbo.rpc.cluster.support.wrapper.MockClusterInvoker} + * {@link RegistryAwareCluster} + * {@link org.apache.dubbo.rpc.cluster.support.RegistryAwareClusterInvoker} + * 1.这里会先调用MockClusterWrapper#join生成一个MockClusterInvoker(为什么先调用wrapper前面讲过) + * 2.接着在MockClusterWrapper#join方法中会继续调用join方法,通过SPI机制,实际调用的是 + * RegistryAwareCluster生成RegistryAwareClusterInvoker + * + */ + invoker = CLUSTER.join(new StaticDirectory(u, invokers)); + } else { // not a registry url, must be direct invoke. + // 如果不存在注册中心地址, 生成一个FailoverClusterInvoker + invoker = CLUSTER.join(new StaticDirectory(invokers)); + } + } + } + + if (shouldCheck() && !invoker.isAvailable()) { + throw new IllegalStateException("Failed to check the status of the service " + interfaceName + ". No provider available for the service " + (group == null ? "" : group + "/") + interfaceName + (version == null ? "" : ":" + version) + " from the url " + invoker.getUrl() + " to the consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion()); + } + if (logger.isInfoEnabled()) { + logger.info("Refer dubbo service " + interfaceClass.getName() + " from url " + invoker.getUrl()); + } + /** + * @since 2.7.0 + * ServiceData Store + */ + MetadataReportService metadataReportService = null; + if ((metadataReportService = getMetadataReportService()) != null) { + URL consumerURL = new URL(CONSUMER_PROTOCOL, map.remove(REGISTER_IP_KEY), 0, map.get(INTERFACE_KEY), map); + metadataReportService.publishConsumer(consumerURL); + } + // create service proxy + return (T) PROXY_FACTORY.getProxy(invoker); + } + +``` + + + + + +Dubbo官方给的Demo没有配置URL,所以这里就是NULL + + + + + + + + + + + +#### MockClusterWrapper + +```java + public class MockClusterWrapper implements Cluster { + + private Cluster cluster; + + public MockClusterWrapper(Cluster cluster) { + this.cluster = cluster; + } + + @Override + public Invoker join(Directory directory) throws RpcException { + return new MockClusterInvoker(directory, + this.cluster.join(directory)); + } + + } +``` + +#### RegistryAwareCluster + +```java + public class RegistryAwareCluster implements Cluster { + + public final static String NAME = "registryaware"; + + @Override + public Invoker join(Directory directory) throws RpcException { + return new RegistryAwareClusterInvoker(directory); + } + + } +``` + +#### RegistryAwareClusterInvoker + +```java + public class RegistryAwareClusterInvoker extends AbstractClusterInvoker { + + private static final Logger logger = LoggerFactory.getLogger(RegistryAwareClusterInvoker.class); + + public RegistryAwareClusterInvoker(Directory directory) { + super(directory); + } + + @Override + @SuppressWarnings({"unchecked", "rawtypes"}) + public Result doInvoke(Invocation invocation, final List> invokers, LoadBalance loadbalance) throws RpcException { + // First, pick the invoker (XXXClusterInvoker) that comes from the local registry, distinguish by a 'default' key. + // 如果在引入服务时,存在多个invoker,并且某个invoker是default的,则在调用时会使用该invoker,不会调用其它invoker + //就比如说配置文件写了两个注册中心,然后会生成 + /** + * 1.如果在引入服务时,存在多个invoker,并且某个invoker是default的, + * 则在调用时会使用该invoker,不会调用其它invoker + * 2.比如说配置文件写了两个注册中心,一个redis和一个zookeeper + */ + for (Invoker invoker : invokers) { + if (invoker.isAvailable() && invoker.getUrl().getParameter(REGISTRY_KEY + "." + DEFAULT_KEY, false)) { + return invoker.invoke(invocation); + } + } + + // If none of the invokers has a local signal, pick the first one available. + // 如果没有default,则取第一个 + for (Invoker invoker : invokers) { + // 如果对应的注册中心中没有当前调用的服务信息,则不可用 + if (invoker.isAvailable()) { + return invoker.invoke(invocation); + } + } + throw new RpcException("No provider available in " + invokers); + } + } +``` + + + + + + + +### 源码分析-监听注册中心并第一次拉取provider配置 + +#### RegistryProtocol + +```java + public Invoker refer(Class type, URL url) throws RpcException { + + // 从registry://的url中获取对应的注册中心,比如zookeeper + // url由 registry:// 改变为---> zookeeper:// + url = URLBuilder.from(url) + .setProtocol(url.getParameter(REGISTRY_KEY, DEFAULT_REGISTRY)) + .removeParameter(REGISTRY_KEY) + .build(); + + // 拿到注册中心实现,ZookeeperRegistry + Registry registry = registryFactory.getRegistry(url); + + // 下面这个代码,通过过git历史提交记录是用来解决SimpleRegistry不可用的问题,不用管这里 + if (RegistryService.class.equals(type)) { + return proxyFactory.getInvoker((T) registry, type, url); + } + + // qs表示 queryString, 表示url中的参数,表示消费者引入服务时@Reference所配置的参数 + // 就是把URL中的参数变成kv形式的map + Map qs = StringUtils.parseQueryString(url.getParameterAndDecoded(REFER_KEY)); + + // group="a,b" or group="*" + // https://dubbo.apache.org/zh/docs/v2.7/user/examples/group-merger/ + String group = qs.get(GROUP_KEY); + if (group != null && group.length() > 0) { + if ((COMMA_SPLIT_PATTERN.split(group)).length > 1 || "*".equals(group)) { + // group有多个值,这里的cluster为MergeableCluster + return doRefer(getMergeableCluster(), registry, type, url); + } + } + + // 这里的cluster是cluster的Adaptive对象,扩展点 + return doRefer(cluster, registry, type, url); + } + + private Invoker doRefer(Cluster cluster, Registry registry, Class type, URL url) { + // RegistryDirectory表示动态服务目录,会和注册中心的数据保持同步 + // type表示一个服务对应一个RegistryDirectory,url表示注册中心地址 + // 在消费端,最核心的就是RegistryDirectory + RegistryDirectory directory = new RegistryDirectory(type, url); + directory.setRegistry(registry); + directory.setProtocol(protocol); + + + // all attributes of REFER_KEY + // 引入服务所配置的参数 + Map parameters = new HashMap(directory.getUrl().getParameters()); + + // 消费者url + URL subscribeUrl = new URL(CONSUMER_PROTOCOL, parameters.remove(REGISTER_IP_KEY), 0, type.getName(), parameters); + if (!ANY_VALUE.equals(url.getServiceInterface()) && url.getParameter(REGISTER_KEY, true)) { + directory.setRegisteredConsumerUrl(getRegisteredConsumerUrl(subscribeUrl, url)); + + // 注册简化后的消费url + registry.register(directory.getRegisteredConsumerUrl()); + } + + // 构造路由链,路由链会在引入服务时按路由条件进行过滤 + // 路由链是动态服务目录中的一个属性,通过路由链可以过滤某些服务提供者 + directory.buildRouterChain(subscribeUrl); + + /** + * 1.服务目录需要订阅监听的几个路径 + * 当前应用所对应的动态配置目录:/dubbo/config/dubbo/dubbo-demo-consumer-application.configurators + * 当前所引入的服务的动态配置目录:/dubbo/config/dubbo/org.apache.dubbo.demo.DemoService:1.1.1:g1.configurators + * 当前所引入的服务的提供者目录:/dubbo/org.apache.dubbo.demo.DemoService/providers + * 当前所引入的服务的老版本动态配置目录:/dubbo/org.apache.dubbo.demo.DemoService/configurators + * 当前所引入的服务的老版本路由器目录:/dubbo/org.apache.dubbo.demo.DemoService/routers + * 这里由于Dubbo新老版本的兼容,导致监听路径看着有点乱 + * 2.在订阅监听指定路径这一步也会直接把相应路径下的配置拉下来, + */ + directory.subscribe(subscribeUrl.addParameter(CATEGORY_KEY, + PROVIDERS_CATEGORY + "," + CONFIGURATORS_CATEGORY + "," + ROUTERS_CATEGORY)); + + // 利用传进来的cluster,join得到invoker, MockClusterWrapper + Invoker invoker = cluster.join(directory); + ProviderConsumerRegTable.registerConsumer(invoker, url, subscribeUrl, directory); + return invoker; + } +``` + + + +#### RegistryDirectory + +```java + public void subscribe(URL url) { + setConsumerUrl(url); + CONSUMER_CONFIGURATION_LISTENER.addNotifyListener(this); // 监听consumer应用 + serviceConfigurationListener = new ReferenceConfigurationListener(this, url); // 监听所引入的服务的动态配置 + // 这里又是SPI,会先调用FailbackRegistry + registry.subscribe(url, this); + } +``` + +1. 这里再讲一下,为什么这里会先调用FailbackRegistry。 +2. 这里的SPI机制是根据this对象的registry属性来决定的,this是RegistryDirectory对象,RegistryDirectory实现了NotifyListener接口 +3. 看下面的截图,registry属性是zookeeper的URL,所以应该是要调用ZookeeperRegistry的subscribe()方法,但是ZookeeperRegistry没有这个方法,所以我们就要找它的父类了,也就是FailbackRegistry, +4. 然后再调用doSubscribe(),ZookeeperRegistry重写了此方法,很明显这是个模板模式。 + + + +#### FailbackRegistry + +```java + public void subscribe(URL url, NotifyListener listener) { + super.subscribe(url, listener); + removeFailedSubscribed(url, listener); + try { + // Sending a subscription request to the server side + doSubscribe(url, listener); + } catch (Exception e) { + Throwable t = e; + + List urls = getCacheUrls(url); + if (CollectionUtils.isNotEmpty(urls)) { + notify(url, listener, urls); + logger.error("Failed to subscribe " + url + ", Using cached list: " + urls + " from cache file: " + getUrl().getParameter(FILE_KEY, System.getProperty("user.home") + "/dubbo-registry-" + url.getHost() + ".cache") + ", cause: " + t.getMessage(), t); + } else { + // If the startup detection is opened, the Exception is thrown directly. + boolean check = getUrl().getParameter(Constants.CHECK_KEY, true) + && url.getParameter(Constants.CHECK_KEY, true); + boolean skipFailback = t instanceof SkipFailbackWrapperException; + if (check || skipFailback) { + if (skipFailback) { + t = t.getCause(); + } + throw new IllegalStateException("Failed to subscribe " + url + ", cause: " + t.getMessage(), t); + } else { + logger.error("Failed to subscribe " + url + ", waiting for retry, cause: " + t.getMessage(), t); + } + } + + // Record a failed registration request to a failed list, retry regularly + // 添加listener,向zk添加监听器时如果报错了,那么会把这个listener添加到failedSubscribed中,并会定时重试(重新注册listener) + addFailedSubscribed(url, listener); + } + } +``` + +#### ZookeeperRegistry + +```java +// 进行订阅,先看父类的subscribe方法 + @Override + public void doSubscribe(final URL url, final NotifyListener listener) { + try { + if (ANY_VALUE.equals(url.getServiceInterface())) { + // 订阅所有服务 + + String root = toRootPath(); + ConcurrentMap listeners = zkListeners.get(url); + if (listeners == null) { + zkListeners.putIfAbsent(url, new ConcurrentHashMap<>()); + listeners = zkListeners.get(url); + } + ChildListener zkListener = listeners.get(listener); + if (zkListener == null) { + listeners.putIfAbsent(listener, (parentPath, currentChilds) -> { + for (String child : currentChilds) { + child = URL.decode(child); + if (!anyServices.contains(child)) { + anyServices.add(child); + subscribe(url.setPath(child).addParameters(INTERFACE_KEY, child, + Constants.CHECK_KEY, String.valueOf(false)), listener); + } + } + }); + zkListener = listeners.get(listener); + } + zkClient.create(root, false); + List services = zkClient.addChildListener(root, zkListener); + if (CollectionUtils.isNotEmpty(services)) { + for (String service : services) { + service = URL.decode(service); + anyServices.add(service); + subscribe(url.setPath(service).addParameters(INTERFACE_KEY, service, + Constants.CHECK_KEY, String.valueOf(false)), listener); + } + } + } else { + // 单独订阅某一个服务 + + List urls = new ArrayList<>(); + // 得到真正要监听的zk上的路径, + for (String path : toCategoriesPath(url)) { + // 根据监听地址去拿listeners,如果没有则生成 + ConcurrentMap listeners = zkListeners.get(url); + if (listeners == null) { + zkListeners.putIfAbsent(url, new ConcurrentHashMap<>()); + listeners = zkListeners.get(url); + } + + // 一个NotifyListener对应一个ChildListener + ChildListener zkListener = listeners.get(listener); + if (zkListener == null) { + // lambda表达式就是监听逻辑, parentPath表示父path,currentChilds表示当前拥有的child, 会调用notify方法进行实际的处理 + listeners.putIfAbsent(listener, (parentPath, currentChilds) -> ZookeeperRegistry.this.notify(url, listener, toUrlsWithEmpty(url, parentPath, currentChilds))); + zkListener = listeners.get(listener); + } + // 创建zk上路径 + zkClient.create(path, false); + + // 添加真正跟zk相关的ChildListener,ChildListener中的逻辑就是监听到zk上数据发生了变化后会触发的逻辑 + List children = zkClient.addChildListener(path, zkListener); + if (children != null) { + urls.addAll(toUrlsWithEmpty(url, path, children)); + } + } + // 这里的urls就是从现在所引入的服务的目录下先主动拉一次配置,比如下面这个三个目录下的路径 +// "/dubbo/org.apache.dubbo.demo.DemoService/providers" +// "/dubbo/org.apache.dubbo.demo.DemoService/configurators" +// "/dubbo/org.apache.dubbo.demo.DemoService/routers" + notify(url, listener, urls); + } + } catch (Throwable e) { + throw new RpcException("Failed to subscribe " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e); + } + } +``` + +#### FailbackRegistry + +这里再调用父类的通知方法,先主动把注册中心上的配置拉下来一次 + +```java + /** + * 接收到通知,处理通知的方法 + * @param url 被监听的url + * @param listener 监听器 + * @param urls 要么有一个empty://,要么有一个或多个override://协议 + */ + @Override + protected void notify(URL url, NotifyListener listener, List urls) { + if (url == null) { + throw new IllegalArgumentException("notify url == null"); + } + if (listener == null) { + throw new IllegalArgumentException("notify listener == null"); + } + try { + doNotify(url, listener, urls); + } catch (Exception t) { + // 处理通知失败 + // Record a failed registration request to a failed list, retry regularly + addFailedNotified(url, listener, urls); + logger.error("Failed to notify for subscribe " + url + ", waiting for retry, cause: " + t.getMessage(), t); + } + } + + protected void doNotify(URL url, NotifyListener listener, List urls) { + super.notify(url, listener, urls); + } + +``` + +#### AbstractRegistry + +```java + protected void notify(URL url, NotifyListener listener, List urls) { + if (url == null) { + throw new IllegalArgumentException("notify url == null"); + } + if (listener == null) { + throw new IllegalArgumentException("notify listener == null"); + } + if ((CollectionUtils.isEmpty(urls)) + && !ANY_VALUE.equals(url.getServiceInterface())) { + logger.warn("Ignore empty notify urls for subscribe url " + url); + return; + } + if (logger.isInfoEnabled()) { + logger.info("Notify urls for subscribe url " + url + ", urls: " + urls); + } + // keep every provider's category. + Map> result = new HashMap<>(); + for (URL u : urls) { + if (UrlUtils.isMatch(url, u)) { + String category = u.getParameter(CATEGORY_KEY, DEFAULT_CATEGORY); + List categoryList = result.computeIfAbsent(category, k -> new ArrayList<>()); + categoryList.add(u); + } + } + if (result.size() == 0) { + return; + } + Map> categoryNotified = notified.computeIfAbsent(url, u -> new ConcurrentHashMap<>()); + for (Map.Entry> entry : result.entrySet()) { + String category = entry.getKey(); + List categoryList = entry.getValue(); + categoryNotified.put(category, categoryList); + //从这里进 + listener.notify(categoryList); + // We will update our cache file after each notification. + // When our Registry has a subscribe failure due to network jitter, we can return at least the existing cache URL. + saveProperties(url); + } + } +``` + +最终走到了这一步 + + + +#### RegistryDirectory + +```java + public synchronized void notify(List urls) { + Map> categoryUrls = urls.stream() + .filter(Objects::nonNull) + .filter(this::isValidCategory) + .filter(this::isNotCompatibleFor26x) + .collect(Collectors.groupingBy(url -> { + if (UrlUtils.isConfigurator(url)) { + return CONFIGURATORS_CATEGORY; + } else if (UrlUtils.isRoute(url)) { + return ROUTERS_CATEGORY; + } else if (UrlUtils.isProvider(url)) { + return PROVIDERS_CATEGORY; + } + return ""; + })); + + // 获取动态配置URL,生成configurators + List configuratorURLs = categoryUrls.getOrDefault(CONFIGURATORS_CATEGORY, Collections.emptyList()); + this.configurators = Configurator.toConfigurators(configuratorURLs).orElse(this.configurators); + + // 获取老版本路由URL,生成Router,并添加到路由链中 + List routerURLs = categoryUrls.getOrDefault(ROUTERS_CATEGORY, Collections.emptyList()); + toRouters(routerURLs).ifPresent(this::addRouters); + + // 获取服务提供者URL + List providerURLs = categoryUrls.getOrDefault(PROVIDERS_CATEGORY, Collections.emptyList()); + refreshOverrideAndInvoker(providerURLs); + } + + private void refreshOverrideAndInvoker(List urls) { + // mock zookeeper://xxx?mock=return null + overrideDirectoryUrl(); + refreshInvoker(urls); + } + + private void refreshInvoker(List invokerUrls) { //http:// dubbo:// + Assert.notNull(invokerUrls, "invokerUrls should not be null"); + + if (invokerUrls.size() == 1 && invokerUrls.get(0) != null && EMPTY_PROTOCOL.equals(invokerUrls.get(0).getProtocol())) { + this.forbidden = true; // Forbid to access + this.invokers = Collections.emptyList(); + routerChain.setInvokers(this.invokers); + destroyAllInvokers(); // Close all invokers + } else { + this.forbidden = false; // Allow to access + Map> oldUrlInvokerMap = this.urlInvokerMap; // local reference + if (invokerUrls == Collections.emptyList()) { + invokerUrls = new ArrayList<>(); + } + if (invokerUrls.isEmpty() && this.cachedInvokerUrls != null) { + invokerUrls.addAll(this.cachedInvokerUrls); + } else { + this.cachedInvokerUrls = new HashSet<>(); + this.cachedInvokerUrls.addAll(invokerUrls);//Cached invoker urls, convenient for comparison + } + if (invokerUrls.isEmpty()) { + return; + } + // 这里会先按Protocol进行过滤,并且调用DubboProtocol.refer方法得到DubboInvoker + Map> newUrlInvokerMap = toInvokers(invokerUrls);// Translate url list to Invoker map + + /** + * If the calculation is wrong, it is not processed. + * + * 1. The protocol configured by the client is inconsistent with the protocol of the server. + * eg: consumer protocol = dubbo, provider only has other protocol services(rest). + * 2. The registration center is not robust and pushes illegal specification data. + * + */ + if (CollectionUtils.isEmptyMap(newUrlInvokerMap)) { + logger.error(new IllegalStateException("urls to invokers error .invokerUrls.size :" + invokerUrls.size() + ", invoker.size :0. urls :" + invokerUrls + .toString())); + return; + } + + List> newInvokers = Collections.unmodifiableList(new ArrayList<>(newUrlInvokerMap.values())); + // pre-route and build cache, notice that route cache should build on original Invoker list. + // toMergeMethodInvokerMap() will wrap some invokers having different groups, those wrapped invokers not should be routed. + // 得到了所引入的服务Invoker之后,把它们设置到路由链中去,在调用时使用,并且会调用TagRouter的notify方法 + routerChain.setInvokers(newInvokers); + this.invokers = multiGroup ? toMergeInvokerList(newInvokers) : newInvokers; + this.urlInvokerMap = newUrlInvokerMap; + + try { + destroyUnusedInvokers(oldUrlInvokerMap, newUrlInvokerMap); // Close the unused Invoker + } catch (Exception e) { + logger.warn("destroyUnusedInvokers error. ", e); + } + } + } + + private Map> toInvokers(List urls) { + Map> newUrlInvokerMap = new HashMap<>(); + if (urls == null || urls.isEmpty()) { + return newUrlInvokerMap; + } + Set keys = new HashSet<>(); + String queryProtocols = this.queryMap.get(PROTOCOL_KEY); + + // 遍历当前服务所有的服务提供者URL + for (URL providerUrl : urls) { + // If protocol is configured at the reference side, only the matching protocol is selected + if (queryProtocols != null && queryProtocols.length() > 0) { + boolean accept = false; + String[] acceptProtocols = queryProtocols.split(","); + + // 当前消费者如果手动配置了Protocol,那么则进行匹配 + for (String acceptProtocol : acceptProtocols) { + if (providerUrl.getProtocol().equals(acceptProtocol)) { + accept = true; + break; + } + } + if (!accept) { + continue; + } + } + if (EMPTY_PROTOCOL.equals(providerUrl.getProtocol())) { + continue; + } + + // 当前Protocol是否在应用中存在对应的扩展点 + if (!ExtensionLoader.getExtensionLoader(Protocol.class).hasExtension(providerUrl.getProtocol())) { + logger.error(new IllegalStateException("Unsupported protocol " + providerUrl.getProtocol() + + " in notified url: " + providerUrl + " from registry " + getUrl().getAddress() + + " to consumer " + NetUtils.getLocalHost() + ", supported protocol: " + + ExtensionLoader.getExtensionLoader(Protocol.class).getSupportedExtensions())); + continue; + } + //重要 + URL url = mergeUrl(providerUrl); + + String key = url.toFullString(); // The parameter urls are sorted + if (keys.contains(key)) { // Repeated url + continue; + } + keys.add(key); + // Cache key is url that does not merge with consumer side parameters, regardless of how the consumer combines parameters, if the server url changes, then refer again + Map> localUrlInvokerMap = this.urlInvokerMap; // local reference + Invoker invoker = localUrlInvokerMap == null ? null : localUrlInvokerMap.get(key); + + // 如果当前服务提供者URL没有生产过Invoker + if (invoker == null) { // Not in the cache, refer again + try { + boolean enabled = true; + if (url.hasParameter(DISABLED_KEY)) { + enabled = !url.getParameter(DISABLED_KEY, false); + } else { + enabled = url.getParameter(ENABLED_KEY, true); + } + if (enabled) { + // 调用Protocol的refer方法得到一个Invoker DubboProtocol.refer() + invoker = new InvokerDelegate<>(protocol.refer(serviceType, url), url, providerUrl); + } + } catch (Throwable t) { + logger.error("Failed to refer invoker for interface:" + serviceType + ",url:(" + url + ")" + t.getMessage(), t); + } + if (invoker != null) { // Put new invoker in cache + newUrlInvokerMap.put(key, invoker); + } + } else { + newUrlInvokerMap.put(key, invoker); + } + } + keys.clear(); + return newUrlInvokerMap; + } + // 这里就是根据优先级,把动态配置和服务提供者URL等等还有其它一些配置URL合并,得到真正能转换成DubboInvoker的URL + private URL mergeUrl(URL providerUrl) { + providerUrl = ClusterUtils.mergeUrl(providerUrl, queryMap); // Merge the consumer side parameters + + //动态配置的内容去覆盖 + providerUrl = overrideWithConfigurator(providerUrl); + + providerUrl = providerUrl.addParameter(Constants.CHECK_KEY, String.valueOf(false)); // Do not check whether the connection is successful or not, always create Invoker! + + // The combination of directoryUrl and override is at the end of notify, which can't be handled here + this.overrideDirectoryUrl = this.overrideDirectoryUrl.addParametersIfAbsent(providerUrl.getParameters()); // Merge the provider side parameters + + if ((providerUrl.getPath() == null || providerUrl.getPath() + .length() == 0) && DUBBO_PROTOCOL.equals(providerUrl.getProtocol())) { // Compatible version 1.0 + //fix by tony.chenl DUBBO-44 + String path = directoryUrl.getParameter(INTERFACE_KEY); + if (path != null) { + int i = path.indexOf('/'); + if (i >= 0) { + path = path.substring(i + 1); + } + i = path.lastIndexOf(':'); + if (i >= 0) { + path = path.substring(0, i); + } + providerUrl = providerUrl.setPath(path); + } + } + return providerUrl; + } +``` + + + +### 源码分析-路由链生成 + +#### 构造路由链概述 + +1. RouterChain.buildChain(url)方法赋值得到路由链。这里的url是这样的: + +```html +consumer://192.168.0.100/org.apache.dubbo.demo.DemoService?application=dubbo-demo-consumer-application +&dubbo=2.0.2&group=g1&interface=org.apache.dubbo.demo.DemoService&lazy=false&methods=sayHello&pid=19852 +&release=2.7.0&revision=1.1.1&side=consumer&sticky=false×tamp=1591332529643&version=1.1.1 +``` + +2. 表示所引入的服务的参数,在获得路由链时就要根据这些参数去匹配得到符合当前的服务的Router. + + + +**大致源码过程:** + +1. RouterChain.buildChain(url) +2. new RouterChain<>(url) +3. List extensionFactories = ExtensionLoader.getExtensionLoader(RouterFactory.class).getActivateExtension(url, (String\[\]) null);根据url去获取可用的RouterFactory,可以拿到四个: + + 1. MockRouterFactory:Mock路由,没有order,相当于order=0 + 2. TagRouterFactory: 标签路由,order = 100 + 3. AppRouterFactory: 应用条件路由,order = 200 + 4. ServiceRouterFactory: 服务条件路由,order = 300 + +4. 遍历每个RouterFactory,调用getRouter(url)方法得到Router,存到List routers中 +5. 对routers按order从小到大的顺序进行排序 + + + +**文字描述:** + +1. AppRouter和ServiceRouter是非常类似,他们的父类都是ListenableRouter,在创建AppRouter和ServiceRouter时,会绑定一个监听器,比如: + 1. AppRouter监听的是:/dubbo/config/dubbo/dubbo-demo-consumer-application.condition-router节点的内容 + 2. ServiceRouter监听的是:/dubbo/config/dubbo/org.apache.dubbo.demo.DemoService:1.1.1:g1.condition-router节点的内容 + +2. 绑定完监听器之后,会主动去获取一下对应节点的内容(也就是所配置的路由规则内容),然后会去解析内容得到ConditionRouterRule routerRule,再调用generateConditions(routerRule);方法解析出一个或多个ConditionRouter,并存入到List conditionRouters中。 +3. 注意routerRule和conditionRouters是ListenableRouter的属性,就是在AppRouter和ServiceRouter中的。 +4. 对于TagRouter就比较特殊,首先标签路由是用在,当消费者在调用某个服务时,通过在请求中设置标签,然后根据所设置的标签获得可用的服务提供者地址。而且目前TagRouter只支持应用级别的配置(而且是服务提供者应用,给某个服务提供者应用打标)。 +5. 所以对于服务消费者而言,在引用某个服务时,需要知道提供这个服务的应用名,然后去监听这个应用名对应的.tag-router节点内容,比如/dubbo/config/dubbo/dubbo-demo-provider-application.tag-router。 +6. 那么问题来了,怎么才能知道提供这个服务的服务提供者的应用名呢?答案是,需要先获取到当前所引入服务的服务提供者URL,从URL中得到服务提供者的应用名。拿到应用名之后才能去应用名对应的.tag-router节点去绑定监听器。 +7. 这就是TagRouter和AppRouter、ServiceRouter的区别,对于AppRouter而言,监听的是本消费者应用的路由规则,对于ServiceRouter而言,监听的是所引入服务的路由规则,都比较简单。 +8. 所以,TagRouter是在引入服务时,获取到服务的提供者URL之后,才会去监听.tag-router节点中的内容,并手动获取一次节点中的内容,设置TagRouter对象中tagRouterRule属性,表示标签路由规则。 + +到此,路由链构造完毕。 + + + + + + + + + + + +RegistryProtocol + + +```java + private Invoker doRefer(Cluster cluster, Registry registry, Class type, URL url) { + // RegistryDirectory表示动态服务目录,会和注册中心的数据保持同步 + // type表示一个服务对应一个RegistryDirectory,url表示注册中心地址 + // 在消费端,最核心的就是RegistryDirectory + RegistryDirectory directory = new RegistryDirectory(type, url); + directory.setRegistry(registry); + directory.setProtocol(protocol); + // all attributes of REFER_KEY + // 引入服务所配置的参数 + Map parameters = new HashMap(directory.getUrl().getParameters()); + + // 消费者url + URL subscribeUrl = new URL(CONSUMER_PROTOCOL, parameters.remove(REGISTER_IP_KEY), 0, type.getName(), parameters); + if (!ANY_VALUE.equals(url.getServiceInterface()) && url.getParameter(REGISTER_KEY, true)) { + directory.setRegisteredConsumerUrl(getRegisteredConsumerUrl(subscribeUrl, url)); + + // 注册简化后的消费url + registry.register(directory.getRegisteredConsumerUrl()); + } + + // 构造路由链,路由链会在引入服务时按路由条件进行过滤 + // 路由链是动态服务目录中的一个属性,通过路由链可以过滤某些服务提供者 + directory.buildRouterChain(subscribeUrl); + + directory.subscribe(subscribeUrl.addParameter(CATEGORY_KEY, + PROVIDERS_CATEGORY + "," + CONFIGURATORS_CATEGORY + "," + ROUTERS_CATEGORY)); + + // 利用传进来的cluster,join得到invoker, MockClusterWrapper + Invoker invoker = cluster.join(directory); + ProviderConsumerRegTable.registerConsumer(invoker, url, subscribeUrl, directory); + return invoker; +} +``` + +#### RegistryDirectory + +```java + public void buildRouterChain(URL url) { + this.setRouterChain(RouterChain.buildChain(url)); + } +``` + +#### RouterChain + +```java + public static RouterChain buildChain(URL url) { + return new RouterChain<>(url); + } + + private RouterChain(URL url) { + // 拿到RouterFactory接口有哪些扩展实现类,比如默认情况下就有四个: + // 0 = {MockRouterFactory@2880} + // 1 = {TagRouterFactory@2881} // 标签路由 + // 2 = {AppRouterFactory@2882} // 应用条件路由 + // 3 = {ServiceRouterFactory@2883} // 服务条件路由 + List extensionFactories = ExtensionLoader.getExtensionLoader(RouterFactory.class) + .getActivateExtension(url, (String[]) null); + + // 然后利用RouterFactory根据url生成各个类型的Router + // 这里生产的routers已经是真实可用的了,但是有个比较特殊的: + // 对于应用条件路由和服务条件路由对于的Router对象,对象内部已经有真实可用的数据了(数据已经从配置中心得到了) + // 但是对于标签路由则没有,它暂时还相当于一个没有内容的对象(还没有从配置中心获取标签路由的数据) + List routers = extensionFactories.stream() + .map(factory -> factory.getRouter(url)) + .collect(Collectors.toList()); + + // 把routers按priority进行排序 + initWithRouters(routers); + } + + public void setInvokers(List> invokers) { + this.invokers = (invokers == null ? Collections.emptyList() : invokers); + routers.forEach(router -> router.notify(this.invokers)); + } +``` + + + +#### AppRouterFactory + +> 以AppRouterFactory为例 + +```java + @Activate(order = 200) + public class AppRouterFactory implements RouterFactory { + public static final String NAME = "app"; + + private volatile Router router; + + @Override + public Router getRouter(URL url) { + if (router != null) { + return router; + } + synchronized (this) { + if (router == null) { + router = createRouter(url); + } + } + return router; + } + + private Router createRouter(URL url) { + // 内部会进行初始化 + return new AppRouter(DynamicConfiguration.getDynamicConfiguration(), url); + } + } +``` + +#### AppRouter + +```java + public class AppRouter extends ListenableRouter { + public static final String NAME = "APP_ROUTER"; + /** + * AppRouter should after ServiceRouter + */ + private static final int APP_ROUTER_DEFAULT_PRIORITY = 150; + + public AppRouter(DynamicConfiguration configuration, URL url) { + // 拿到应用名 + super(configuration, url, url.getParameter(CommonConstants.APPLICATION_KEY)); + this.priority = APP_ROUTER_DEFAULT_PRIORITY; + } + } +``` + +#### ListenableRouter + +```java + public ListenableRouter(DynamicConfiguration configuration, URL url, String ruleKey) { + super(configuration, url); + this.force = false; + // ruleKey为服务名或应用名 + // 初始化,会绑定一个监听器,负责监听配置中心条件路由的修改,并且会主动从配置中心获取一下当前条件路由的数据并做解析 + this.init(ruleKey); + } + + private synchronized void init(String ruleKey) { + if (StringUtils.isEmpty(ruleKey)) { + return; + } + // 服务名+".condition-router",或 应用名+".condition-router" + String routerKey = ruleKey + RULE_SUFFIX; + // 绑定一个监听器去监听routerKey对应的路径,当前类ListenableRouter就自带了一个监听器 + configuration.addListener(routerKey, this); + + // 绑定完监听器后,主动的从配置中心获取一下当前服务或消费者应用的对应的路由配置 + String rule = configuration.getRule(routerKey, DynamicConfiguration.DEFAULT_GROUP); + + if (StringUtils.isNotEmpty(rule)) { + // 手动调用监听器处理事件的方法process() + this.process(new ConfigChangeEvent(routerKey, rule)); + } + } + + public synchronized void process(ConfigChangeEvent event) { + if (logger.isInfoEnabled()) { + logger.info("Notification of condition rule, change type is: " + event.getChangeType() + + ", raw rule is:\n " + event.getValue()); + } + + if (event.getChangeType().equals(ConfigChangeType.DELETED)) { + // 如果是一个删除事件,则清空当前Router中的conditionRouters属性,表示当前Router对象中没有路由规则 + routerRule = null; + conditionRouters = Collections.emptyList(); + } else { + try { + // 解析路由规则 + routerRule = ConditionRuleParser.parse(event.getValue()); + // 根据路由规则,生成ConditionRouter-条件路由对象,并赋值给当前Router对象的conditionRouters属性 + generateConditions(routerRule); + } catch (Exception e) { + logger.error("Failed to parse the raw condition rule and it will not take effect, please check " + + "if the condition rule matches with the template, the raw rule is:\n " + event.getValue(), e); + } + } + } +``` + + + +### Invoker总结 + +**MockClusterInvoker**: 完成Mock功能,由MockClusterWrapper生成,MockClusterWrapper是Cluster接口的包装类,通过Cluster.join()方法得到MockClusterInvoker + +**FailoverClusterInvoker**:完成集群容错功能,是MockClusterInvoker的下级 + +**RegistryAwareClusterInvoker**:如果指定了多个注册中心,那么RegistryAwareClusterInvoker完成选择默认的注册中心的进行调用,如果没有指定默认的,则会遍历注册中心进行调用,如果该注册中心没有对应的服务则跳过。 + +**DubboInvoker**:完成Dubbo协议底层发送数据 + +**ProtocolFilterWrapper$CallbackRegistrationInvoker**:完成对filter的调用,ProtocolFilterWrapper是Protocol接口的包装类,通过Protocol.refer()方法得到CallbackRegistrationInvoke。 + + + +### DubboProtocol的服务引入(Refer) + +DubboProtocol中并没有refer方法,是在它的父类AbstractProtocol中才有的refer方法 + + ```java + @Override + public Invoker refer(Class type, URL url) throws RpcException { + // 异步转同步Invoker , type是接口,url是服务地址 + // DubboInvoker是异步的,而AsyncToSyncInvoker会封装为同步的 + return new AsyncToSyncInvoker<>(protocolBindingRefer(type, url)); + } + ``` + + + +调用protocolBindingRefer()方法得到一个Invoker后,会包装为一个AsyncToSyncInvoker然后作为refer方法的结果返回。 + + + +在DubboProtocol的protocolBindingRefer()方法中会new一个DubboInvoker,然后就返回了。 + + + +在构造DubboInvoker时,有一个非常重要的步骤,构造clients。DubboInvoker作为消费端服务的执行者,在调用服务时,是需要去发送Invocation请求的,而发送请求就需要client,之所以有多个client,是因为DubboProtocol支持多个。 + + + +假如在一个DubboInvoker中有多个Client,那么在使用这个DubboInvoker去调用服务时,就可以提高效率,比如一个服务接口有多个方法,那么在业务代码中,可能会不断的调用该接口中的方法,并且由于DubboProtocol底层会使用异步去发送请求,所以在每次需要发送请求时,就可以从clients轮询一个client去发送这个数据,从而提高效率。 + + + +接下来,来看看clients是如何生成的。 + + + +1. 首先,一个DubboInvoker到底支持多少个Client呢?这是可以配置的,参数为connections,按指定的数字调用initClient(url)得到ExchangeClient。 +2. initClient(url)的实现逻辑为 + + 1. 获取client参数,表示是用netty还是mina等等 + 2. 获取**codec参数,表示数据的编码方式** + 3. **获取****heartbeat参数,表示长连接的心跳时间,超过这个时间服务端没有收到数据则关闭socket,默认为1分钟** + 4. **如果所指定的client没有对应的扩展点,则抛异常** + 5. 获取**lazy参数,默认为false,如果为true,那么则直接返回一个**LazyConnectExchangeClient,表示真正在发送数据时才建立socket + 6. 否则调用Exchangers._connect_(url, **requestHandler**)获得一个client + 7. 在connect()方法中调用HeaderExchanger的connect方法去建立socket连接并得到一个HeaderExchangeClient + 8. 在构造HeaderExchangeClient时需要先执行Transporters._connect_()方法得到一个Client + 9. 会调用NettyTransporter的connect()去构造一个NettyClient + 10. 在构造NettyClient的过程中,会去初始化Netty的客户端,然后连接Server端,建立一个Socket连接 + + + +### 最复杂情况下的Invoker链 + +```java +@Reference(url = "dubbo://192.168.40.17:20881/org.apache.dubbo.demo.DemoService;registry://127.0.0.1:2181/org.apache.dubbo.registry.RegistryService?registry=zookeeper") +private DemoService demoService; +``` + + + +在@Reference注解上定义了url参数,有两个值 + +1. dubbo://192.168.40.17:20881/org.apache.dubbo.demo.DemoService +2. registry://127.0.0.1:2181/org.apache.dubbo.registry.RegistryService?registry=zookeeper + + + +最终refer处理的invoker链路为: + + + +* MockClusterInvoker + + * invoker=RegistryAwareClusterInvoker + + * directory=StaticDirectory + + * 0=**ProtocolFilterWrapper$CallbackRegistrationInvoke子流程** + * 1=MockClusterInvoker + + * FailoverClusterInvoker + + * RegistryDirectory + + * invokers=UnmodifiableRandomAccessList size=1 + + * 0=RegistryDirectory$InvokerDelegate + + * **ProtocolFilterWrapper$CallbackRegistrationInvoke子流程** + + + + + +* **ProtocolFilterWrapper$CallbackRegistrationInvoke子流程** + + * filterInvoker=ProtocolFilterWrapper$1 + + * filter=ConsumerContextFilter + * next=ProtocolFilterWrapper$1 + + * filter=FutureFilter + * next=ProtocolFilterWrapper$1 + + * filter=MonitorFilter + * next=ListenerInvokerWrapper + + * invoker=AsyncToSyncInvoker + * invoker=DubboInvoker + + diff --git a/docs/rpc/dubbo/07.Dubbo源码系列V1-Dubbo第七节-服务调用源码解析.md b/docs/rpc/dubbo/07.Dubbo源码系列V1-Dubbo第七节-服务调用源码解析.md new file mode 100644 index 0000000..bdaa6f0 --- /dev/null +++ b/docs/rpc/dubbo/07.Dubbo源码系列V1-Dubbo第七节-服务调用源码解析.md @@ -0,0 +1,572 @@ +--- +title: 07.Dubbo源码系列V1-Dubbo第七节-服务调用源码解析 +tags: + - Dubbo + - rpc +categories: + - rpc + - Dubbo源码系列v1 +keywords: Dubbo,rpc +description: 服务调用源码解析 +cover: 'https://cdn.jsdelivr.net/gh/youthlql/youthlql/img/dubbo.png' +abbrlink: 84653c9d +date: 2021-11-09 14:11:58 +--- + + + +## 第七节: Dubbo服务调用源码解析 + + + +### 服务导出的Netty启动源码 + +> 最主要的就是构造一个Handler处理链路 + +#### DubboProtocol + +```java + public Exporter export(Invoker invoker) throws RpcException { + URL url = invoker.getUrl(); + + // 唯一标识一个服务的key + String key = serviceKey(url); + // 构造一个Exporter进行服务导出 + DubboExporter exporter = new DubboExporter(invoker, key, exporterMap); + exporterMap.put(key, exporter); + + // 省略... + + // 开启NettyServer + // 请求--->invocation--->服务key--->exporterMap.get(key)--->exporter--->invoker--->invoker.invoke(invocation)-->执行服务 + openServer(url); + + return exporter; + } + + private void openServer(URL url) { + // find server. + String key = url.getAddress(); // 获得ip地址和port, 192.168.40.17:20880 + + // NettyClient, NettyServer + //client can export a service which's only for server to invoke + boolean isServer = url.getParameter(IS_SERVER_KEY, true); + if (isServer) { + // 缓存Server对象 + ExchangeServer server = serverMap.get(key); + + // DCL,Double Check Lock + if (server == null) { + synchronized (this) { + server = serverMap.get(key); + if (server == null) { + // 创建Server,并进行缓存 + serverMap.put(key, createServer(url)); + } + } + } else { + // server supports reset, use together with override + // 服务重新导出时,就会走这里 + server.reset(url); + } + } + } + + private ExchangeServer createServer(URL url) { + url = URLBuilder.from(url) + // send readonly event when server closes, it's enabled by default + .addParameterIfAbsent(CHANNEL_READONLYEVENT_SENT_KEY, Boolean.TRUE.toString()) + // enable heartbeat by default + .addParameterIfAbsent(HEARTBEAT_KEY, String.valueOf(DEFAULT_HEARTBEAT)) + .addParameter(CODEC_KEY, DubboCodec.NAME) + .build(); + + // 协议的服务器端实现类型,比如:dubbo协议的mina,netty等,http协议的jetty,servlet等,默认为netty + String str = url.getParameter(SERVER_KEY, DEFAULT_REMOTING_SERVER); + + if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str)) { + throw new RpcException("Unsupported server type: " + str + ", url: " + url); + } + + // 通过url绑定端口,和对应的请求处理器 + ExchangeServer server; + try { + // requestHandler是请求处理器,类型为ExchangeHandler + // 表示从url的端口接收到请求后,requestHandler来进行处理 + server = Exchangers.bind(url, requestHandler); + } catch (RemotingException e) { + throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e); + } + + // 协议的客户端实现类型,比如:dubbo协议的mina,netty等 + str = url.getParameter(CLIENT_KEY); + if (str != null && str.length() > 0) { + Set supportedTypes = ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions(); + if (!supportedTypes.contains(str)) { + throw new RpcException("Unsupported client type: " + str); + } + } + + return server; + } +``` + +1. NettyClient<------>Socket连接,数据传输层<------>NettyServer。Netty这两端只要建立了连接就可以互相发送数据。 +2. ExchangeClient------数据交换层------ExchangeServer。这是Dubbo抽象出来的概念,主要就是抽象出了请求和响应这两个概念。 +3. ExchangeXXX里面包了Netty的东西 + + + +```java +private ExchangeHandler requestHandler = new ExchangeHandlerAdapter() { + + @Override + public CompletableFuture reply(ExchangeChannel channel, Object message) throws RemotingException { + + if (!(message instanceof Invocation)) { + throw new RemotingException(channel, "Unsupported request: " + + (message == null ? null : (message.getClass().getName() + ": " + message)) + + ", channel: consumer: " + channel.getRemoteAddress() + " --> provider: " + channel.getLocalAddress()); + } + + // 转成Invocation对象,要开始用反射执行方法了 + Invocation inv = (Invocation) message; + Invoker invoker = getInvoker(channel, inv); // 服务实现者 + + // need to consider backward-compatibility if it's a callback + if (Boolean.TRUE.toString().equals(inv.getAttachments().get(IS_CALLBACK_SERVICE_INVOKE))) { + String methodsStr = invoker.getUrl().getParameters().get("methods"); + boolean hasMethod = false; + if (methodsStr == null || !methodsStr.contains(",")) { + hasMethod = inv.getMethodName().equals(methodsStr); + } else { + String[] methods = methodsStr.split(","); + for (String method : methods) { + if (inv.getMethodName().equals(method)) { + hasMethod = true; + break; + } + } + } + if (!hasMethod) { + logger.warn(new IllegalStateException("The methodName " + inv.getMethodName() + + " not found in callback service interface ,invoke will be ignored." + + " please update the api interface. url is:" + + invoker.getUrl()) + " ,invocation is :" + inv); + return null; + } + } + // 这里设置了,service中才能拿到remoteAddress + RpcContext.getContext().setRemoteAddress(channel.getRemoteAddress()); + // 执行服务,得到结果 + Result result = invoker.invoke(inv); + // 返回一个CompletableFuture + return result.completionFuture().thenApply(Function.identity()); + } + + @Override + public void received(Channel channel, Object message) throws RemotingException { + if (message instanceof Invocation) { + // 这是服务端接收到Invocation时的处理逻辑 + reply((ExchangeChannel) channel, message); + + } else { + super.received(channel, message); + } + } + + private void invoke(Channel channel, String methodKey) { + Invocation invocation = createInvocation(channel, channel.getUrl(), methodKey); + if (invocation != null) { + try { + received(channel, invocation); + } catch (Throwable t) { + logger.warn("Failed to invoke event method " + invocation.getMethodName() + "(), cause: " + t.getMessage(), t); + } + } + } +}; + + + + Invoker getInvoker(Channel channel, Invocation inv) throws RemotingException { + boolean isCallBackServiceInvoke = false; + boolean isStubServiceInvoke = false; + int port = channel.getLocalAddress().getPort(); + String path = inv.getAttachments().get(PATH_KEY); + + // if it's callback service on client side + isStubServiceInvoke = Boolean.TRUE.toString().equals(inv.getAttachments().get(STUB_EVENT_KEY)); + if (isStubServiceInvoke) { + port = channel.getRemoteAddress().getPort(); + } + + //callback + isCallBackServiceInvoke = isClientSide(channel) && !isStubServiceInvoke; + if (isCallBackServiceInvoke) { + path += "." + inv.getAttachments().get(CALLBACK_SERVICE_KEY); + inv.getAttachments().put(IS_CALLBACK_SERVICE_INVOKE, Boolean.TRUE.toString()); + } + + // 从请求中拿到serviceKey,从exporterMap中拿到已经导出了的服务 + String serviceKey = serviceKey(port, path, inv.getAttachments().get(VERSION_KEY), inv.getAttachments().get(GROUP_KEY)); + DubboExporter exporter = (DubboExporter) exporterMap.get(serviceKey); + + if (exporter == null) { + throw new RemotingException(channel, "Not found exported service: " + serviceKey + " in " + exporterMap.keySet() + ", may be version or group mismatch " + + ", channel: consumer: " + channel.getRemoteAddress() + " --> provider: " + channel.getLocalAddress() + ", message:" + inv); + } + + // 拿到服务对应的Invoker + return exporter.getInvoker(); + } +``` + + + +#### Exchangers + +```java + public static ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException { + if (url == null) { + throw new IllegalArgumentException("url == null"); + } + if (handler == null) { + throw new IllegalArgumentException("handler == null"); + } + // codec表示协议编码方式 + url = url.addParameterIfAbsent(Constants.CODEC_KEY, "exchange"); + // 通过url得到HeaderExchanger, 利用HeaderExchanger进行bind,将得到一个HeaderExchangeServer + return getExchanger(url).bind(url, handler); + } +``` + +#### HeaderExchange + +```java + @Override + public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException { + + // 下面会去启动Netty + // 对handler包装了两层,表示当处理一个请求时,每层Handler负责不同的处理逻辑 + // 为什么在connect和bind时都是DecodeHandler,解码,解的是把InputStream解析成RpcInvocation对象 + // DecodeHandler -> HeaderExchangeHandler -> DubboProtocol(ExchangeHandlerAdapter) 一层一层包 + // 上面的handler处理完了交给下面的handler + return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler)))); + } +``` + +#### HeaderExchangeServer + +HeaderExchangeServer里有一个server属性,这个server就是NettyServer + +```java + private final Server server; + //启动netty的时候会调用这个 + public HeaderExchangeServer(Server server) { + Assert.notNull(server, "server == null"); + this.server = server; + // 启动定义关闭Channel(socket)的Task + startIdleCheckTask(getUrl()); + } +``` + +#### Transporters + +```java + public static Server bind(URL url, ChannelHandler... handlers) throws RemotingException { + if (url == null) { + throw new IllegalArgumentException("url == null"); + } + if (handlers == null || handlers.length == 0) { + throw new IllegalArgumentException("handlers == null"); + } + + // 如果bind了多个handler,那么当有一个连接过来时,会循环每个handler去处理连接 + ChannelHandler handler; + if (handlers.length == 1) { + handler = handlers[0]; + } else { + handler = new ChannelHandlerDispatcher(handlers); + } + + // 调用NettyTransporter去绑定,Transporter表示网络传输层 + return getTransporter().bind(url, handler); + } + + public static Transporter getTransporter() { + //@SPI默认配置的就是netty + return ExtensionLoader.getExtensionLoader(Transporter.class).getAdaptiveExtension(); + } +``` + +#### NettyTransporter + +```java + @Override + public Server bind(URL url, ChannelHandler listener) throws RemotingException { + return new NettyServer(url, listener); + } +``` + +#### NettyServer + +```java + public NettyServer(URL url, ChannelHandler handler) throws RemotingException { + //多个handler一层一层的包装,有点像责任链模式,这个handler处理完了,交给下一个handler + super(url, ChannelHandlers.wrap(handler, ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME))); + } +``` + +#### ChannelHandlers + +```java +public class ChannelHandlers { + // 单例模式 + private static ChannelHandlers INSTANCE = new ChannelHandlers(); + + protected ChannelHandlers() { + } + + public static ChannelHandler wrap(ChannelHandler handler, URL url) { + return ChannelHandlers.getInstance().wrapInternal(handler, url); + } + + protected static ChannelHandlers getInstance() { + return INSTANCE; + } + + static void setTestingChannelHandlers(ChannelHandlers instance) { + INSTANCE = instance; + } + + protected ChannelHandler wrapInternal(ChannelHandler handler, URL url) { + // 先通过ExtensionLoader.getExtensionLoader(Dispatcher.class).getAdaptiveExtension().dispatch(handler, url) + // 得到一个AllChannelHandler(handler, url) + // 然后把AllChannelHandler包装成HeartbeatHandler,HeartbeatHandler包装成MultiMessageHandler + // 所以当Netty接收到一个数据时,会经历MultiMessageHandler--->HeartbeatHandler---->AllChannelHandler + // 而AllChannelHandler会调用handler + return new MultiMessageHandler(new HeartbeatHandler(ExtensionLoader.getExtensionLoader(Dispatcher.class) + .getAdaptiveExtension().dispatch(handler, url))); + } + +} +``` + +然后回到NettyServer调用super(XXX),走到AbstractServer + +#### AbstractServer + +```java + public AbstractServer(URL url, ChannelHandler handler) throws RemotingException { + super(url, handler); + localAddress = getUrl().toInetSocketAddress(); + + String bindIp = getUrl().getParameter(Constants.BIND_IP_KEY, getUrl().getHost()); + int bindPort = getUrl().getParameter(Constants.BIND_PORT_KEY, getUrl().getPort()); + if (url.getParameter(ANYHOST_KEY, false) || NetUtils.isInvalidLocalHost(bindIp)) { + bindIp = ANYHOST_VALUE; + } + bindAddress = new InetSocketAddress(bindIp, bindPort); + this.accepts = url.getParameter(ACCEPTS_KEY, DEFAULT_ACCEPTS); + this.idleTimeout = url.getParameter(IDLE_TIMEOUT_KEY, DEFAULT_IDLE_TIMEOUT); + try { + doOpen();//走到NettyServer + if (logger.isInfoEnabled()) { + logger.info("Start " + getClass().getSimpleName() + " bind " + getBindAddress() + ", export " + getLocalAddress()); + } + } catch (Throwable t) { + throw new RemotingException(url.toInetSocketAddress(), null, "Failed to bind " + getClass().getSimpleName() + + " on " + getLocalAddress() + ", cause: " + t.getMessage(), t); + } + //fixme replace this with better method + DataStore dataStore = ExtensionLoader.getExtensionLoader(DataStore.class).getDefaultExtension(); + executor = (ExecutorService) dataStore.get(Constants.EXECUTOR_SERVICE_COMPONENT_KEY, Integer.toString(url.getPort())); + } +``` + +#### NettyServer + +```java + protected void doOpen() throws Throwable { + bootstrap = new ServerBootstrap(); + + bossGroup = new NioEventLoopGroup(1, new DefaultThreadFactory("NettyServerBoss", true)); + workerGroup = new NioEventLoopGroup(getUrl().getPositiveParameter(IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS), + new DefaultThreadFactory("NettyServerWorker", true)); + + //最终再包装一个NettyServerHandler,这个就是最外层的Handler,请求来了它是第一个处理的 + final NettyServerHandler nettyServerHandler = new NettyServerHandler(getUrl(), this); + channels = nettyServerHandler.getChannels(); + + bootstrap.group(bossGroup, workerGroup) + .channel(NioServerSocketChannel.class) + .childOption(ChannelOption.TCP_NODELAY, Boolean.TRUE) + .childOption(ChannelOption.SO_REUSEADDR, Boolean.TRUE) + .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT) + .childHandler(new ChannelInitializer() { + @Override + protected void initChannel(NioSocketChannel ch) throws Exception { + // FIXME: should we use getTimeout()? + int idleTimeout = UrlUtils.getIdleTimeout(getUrl()); + // 这里就会拿到DubboCodec,接收到数据之后就会进行解码 + NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this); + ch.pipeline()//.addLast("logging",new LoggingHandler(LogLevel.INFO))//for debug + .addLast("decoder", adapter.getDecoder()) + .addLast("encoder", adapter.getEncoder()) + .addLast("server-idle-handler", new IdleStateHandler(0, 0, idleTimeout, MILLISECONDS)) + .addLast("handler", nettyServerHandler); + } + }); + // bind + ChannelFuture channelFuture = bootstrap.bind(getBindAddress()); + channelFuture.syncUninterruptibly(); + channel = channelFuture.channel(); + + } +``` + + + +#### 分割 + +### 服务提供端执行逻辑 + +#### 概述 + +1. NettyServerHandler:接收数据 +2. MultiMessageHandler:判断接收到的数据是否是MultiMessage,如果是则获取MultiMessage中的单个Message,传递给HeartbeatHandler进行处理 +3. HeartbeatHandler:判断是不是心跳消息,如果是不是则把Message传递给AllChannelHandler +4. AllChannelHandler:把接收到的Message封装为一个ChannelEventRunnable对象,扔给线程池进行处理 +5. ChannelEventRunnable:在ChannelEventRunnable的run方法中会调用DecodeHandler处理Message +6. DecodeHandler:按Dubbo协议的数据格式,解析当前请求的path,versio,方法,方法参数等等,然后把解析好了的请求交给HeaderExchangeHandler +7. HeaderExchangeHandler:处理Request数据,首先构造一个Response对象,然后调用ExchangeHandlerAdapter得到一个CompletionStage future,然后给future通过whenComplete绑定一个回调函数,当future执行完了之后,就可以从回调函数中得到ExchangeHandlerAdapter的执行结果,并把执行结果设置给Response对象,通过channel发送出去。 +8. ExchangeHandlerAdapter:从本机已经导出的Exporter中根据当前Request所对应的服务key,去寻找Exporter对象,从Exporter中得到Invoker,然后执行invoke方法,此Invoker为ProtocolFilterWrapper$CallbackRegistrationInvoker +9. ProtocolFilterWrapper$CallbackRegistrationInvoker:负责执行过滤器链,并且在执行完了之后回调每个过滤器的onResponse或onError方法 +10. EchoFilter:判断当前请求是不是一个回升测试,如果是,则不继续执行过滤器链了(服务实现者Invoker也不会调用了) +11. ClassLoaderFilter:设置当前线程的classloader为当前要执行的服务接口所对应的classloader +12. GenericFilter:把泛化调用发送过来的信息包装为RpcInvocation对象 +13. ContextFilter:设置RpcContext.getContext()的参数 +14. TraceFilter:先执行下一个invoker的invoke方法,调用成功后录调用信息 +15. TimeoutFilter:调用时没有特别处理,只是记录了一下当前时间,当整个filter链都执行完了之后回调TimeoutFilter的onResponse方法时,会判断本次调用是否超过了timeout +16. MonitorFilter:记录当前服务的执行次数 +17. ExceptionFilter:调用时没有特别处理,在回调onResponse方法时,对不同的异常进行处理,详解Dubbo的异常处理 +18. DelegateProviderMetaDataInvoker:过滤器链结束,调用下一个Invoker +19. AbstractProxyInvoker:在服务导出时,根据服务接口,服务实现类对象生成的,它的invoke方法就会执行服务实现类对象的方法,得到结果 + +#### JavassistProxyFactory + +```java +public class JavassistProxyFactory extends AbstractProxyFactory { + + @Override + @SuppressWarnings("unchecked") + public T getProxy(Invoker invoker, Class[] interfaces) { + return (T) Proxy.getProxy(interfaces).newInstance(new InvokerInvocationHandler(invoker)); + } + + @Override + public Invoker getInvoker(T proxy, Class type, URL url) { + + // TODO Wrapper cannot handle this scenario correctly: the classname contains '$' + // 如果现在被代理的对象proxy本身就是一个已经被代理过的对象,那么则取代理类的Wrapper,否则取type(接口)的Wrapper + // Wrapper是针对某个类或某个接口的包装类,通过wrapper对象可以更方便的去执行某个类或某个接口的方法 + final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type); + + // proxy是服务实现类 + // type是服务接口 + // url是一个注册中心url,但同时也记录了 + return new AbstractProxyInvoker(proxy, type, url) { + @Override + protected Object doInvoke(T proxy, String methodName, + Class[] parameterTypes, + Object[] arguments) throws Throwable { + + // 执行proxy的method方法 + // 执行的proxy实例的方法 + // 如果没有wrapper,则要通过原生的反射技术去获取Method对象,然后执行 + return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments); + } + }; + } + +} +``` + + + + + + + +### 服务消费端执行逻辑 + + + +1. MockClusterInvoker.invoke(new RpcInvocation(method, args)):Mock逻辑 +2. AbstractClusterInvoker.invoke(invocation):把RpcContext中设置的Attachments添加到invocation对象上,调用路由链从服务目录上筛选出适合的服务Invoker,获得服务均衡策略loadbalance +3. FailoverClusterInvoker.doInvoke(invocation, invokers, loadbalance):根据负载均衡策略选出一个invoker,然后执行 +4. InvokerWrapper.invoke(invocation):没做什么事情 +5. CallbackRegistrationInvoker.invoke(invocation):开始执行Filter链,执行完得到结果后,会获取ListenableFilter中的listener,执行listener的onResponse方法 +6. ConsumerContextFilter.invoke(invocation):设置RpcContext中LocalAddress、RemoteAddress、RemoteApplicationName参数 +7. FutureFilter.invoke(invocation): +8. MonitorFilter.invoke(invocation):方法的执行次数+1 +9. ListenerInvokerWrapper.invoke(invocation):没做什么事情 +10. AsyncToSyncInvoker.invoke(invocation):异步转同步,会先用下层Invoker去异步执行,然后阻塞Integer.MAX_VALUE时间,直到拿到了结果 +11. AbstractInvoker.invoke(invocation):主要调用DubboInvoker的doInvoke方法,如果doInvoker方法出现了异常,会进行包装,包装成AsyncRpcResult +12. DubboInvoker.doInvoke(invocation):从clients轮询出一个client进行数据发送,如果配置了不关心结果,则调用ReferenceCountExchangeClient的send方法,否则调用ReferenceCountExchangeClient的request方法 +13. ReferenceCountExchangeClient.request(Object request, int timeout):没做什么事情 +14. HeaderExchangeClient.request(Object request, int timeout):没做什么事情 +15. HeaderExchangeChannel.request(Object request, int timeout):构造一个Request对象,并且会构造一个DefaultFuture对象来阻塞timeout的时间来等待结果,在构造DefaultFuture对象时,会把DefaultFuture对象和req的id存入FUTURES中,FUTURES是一个Map,当HeaderExchangeHandler接收到结果时,会从这个Map中根据id获取到DefaultFuture对象,然后返回Response。 +16. AbstractPeer.send(Object message):从url中获取send参数,默认为false +17. AbstractClient.send(Object message, boolean sent):没做什么 +18. NettyChannel.send(Object message, boolean sent):调用NioSocketChannel的writeAndFlush发送数据,然后判断send如果是true,那么则阻塞url中指定的timeout时间,因为如果send是false,在HeaderExchangeChannel中会阻塞timeout时间 +19. NioSocketChannel.writeAndFlush(Object msg):最底层的Netty非阻塞式的发送数据 + + + +总结一下上面调用流程: + +1. 最外层是Mock逻辑,调用前,调用后进行Mock +2. 从服务目录中,根据当前调用的方法和路由链,筛选出部分服务Invoker(DubboInvoker) +3. 对服务Invoker进行负载均衡,选出一个服务Invoker +4. 执行Filter链 +5. AsyncToSyncInvoker完成异步转同步,因为DubboInvoker的执行是异步非阻塞的,所以如果是同步调用,则会在此处阻塞,知道拿到响应结果 +6. DubboInvoker开始异步非阻塞的调用 +7. HeaderExchangeChannel中会阻塞timeout的时间来等待结果,该timeout就是用户在消费端所配置的timeout + + + + + + + + + +### Dubbo的异常处理 + +当服务消费者在调用一个服务时,服务提供者在执行服务逻辑时可能会出现异常,对于Dubbo来说,服务消费者需要在消费端抛出这个异常,那么这个功能是怎么做到的呢? + + + +服务提供者在执行服务时,如果出现了异常,那么框架会把异常捕获,捕获异常的逻辑在AbstractProxyInvoker中,捕获到异常后,会把异常信息包装为正常的AppResponse对象,只是AppResponse的value属性没有值,exception属性有值。 + + + +此后,服务提供者会把这个AppResponse对象发送给服务消费端,服务消费端是在InvokerInvocationHandler中调用AppResponse的recreate方法重新得到一个结果,在recreate方法中会去失败AppResponse对象是否正常,也就是是否存在exception信息,如果存在,则直接throw这个exception,从而做到**服务执行时出现的异常,在服务消费端抛出**。 + + + +那么这里存在一个问题,如果服务提供者抛出的异常类,在服务消费者这边不存在,那么服务消费者也就抛不出这个异常了,那么dubbo是怎么处理的呢? + + + +这里就涉及到了ExceptionFilter,它是服务提供者端的一个过滤器,它主要是在服务提供者执行完服务后会去识别异常: + +1. 如果是需要开发人员捕获的异常,那么忽略,直接把这个异常返回给消费者 +2. 如果在当前所执行的方法签名上有声明,那么忽略,直接把这个异常返回给消费者 +3. 如果抛出的异常不需要开发人员捕获,或者方法上没有申明,那么服务端或记录一个error日志 +4. 异常类和接口类在同一jar包里,那么忽略,直接把这个异常返回给消费者 +5. 如果异常类是JDK自带的异常,那么忽略,直接把这个异常返回给消费者 +6. 如果异常类是Dubbo自带的异常,那么忽略,直接把这个异常返回给消费者 +7. **否则,把异常信息包装成RuntimeException,并覆盖AppResponse对象中的exception属性** \ No newline at end of file