Files
JavaYouth/docs/rpc/dubbo/06.Dubbo源码系列V1-Dubbo第六节-服务引入源码解析.md

1421 lines
68 KiB
Markdown
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
---
title: 06.Dubbo源码系列V1-Dubbo第六节-服务引入源码解析
tags:
- Dubbo
- rpc
categories:
- rpc
- Dubbo源码系列v1
keywords: Dubborpc
description: 服务引入源码解析
cover: 'https://cdn.jsdelivr.net/gh/youthlql/youthlql/img/dubbo.png'
abbrlink: bda15919
date: 2021-11-08 14:11:58
---
## 第六节: Dubbo服务引入源码解析
### 过程
当Spring启动过程中会去给@Reference注解标注了的属性去进行赋值赋值的对象为ReferenceBean中get()方法所返回的对象,这个对象是一个代理对象。
对于ReferenceBean它表示应用想要引入的服务的信息在执行get()时会做如下几步:
1. 调用checkAndUpdateSubConfigs()检查和更新参数和服务提供者类似把ReferenceBean里的属性的值更新为优先级最高的参数值
2. 调用init()去生成代理对象refget()方法会返回这个ref
3. 在生成代理对象ref之前先把消费者所引入服务设置的参数添加到一个map中等会根据这个map中的参数去从注册中心查找服务
4. 把消费者配置的所有注册中心获取出来
1. 如果只有一个注册中心那么直接调用Protocol的refer(interfaceClass, urls.get(0));得到一个Invoker对象
2. 如果有多个注册中心则遍历每个注册中心分别调用Protocol的refer(interfaceClass, url);得到一个Invoker对象添加到invokers中然后把invokers调用CLUSTER.join(new StaticDirectory(u, invokers));封装所有invokers得到一个invoker
5. 把最终得到的invoker对象调用PROXY_FACTORY.getProxy(invoker);得到一个代理对象并返回这个代理对象就是ref
6. 总结上文的Invoker对象表示服务执行者从注册中心refer下来的是一个服务执行者合并invokers后得到的invoker也是一个服务执行者抽象范围更大了
接下来来看Protorol.refer(interfaceClass, url)方法是怎么生成一个Invoker的
1. 首先interfaceClass表示要引入的服务接口url是注册中心的urlregistry://该url中有一个refer参数参数值为当前所要引入服务的参数
2. 调用doRefer(**cluster**, registry, type, url)
3. 在doRefer方法中会生成一个RegistryDirectory
4. 然后获取新版本中的路由器链并添加到RegistryDirectory中去
5. RegistryDirectory监听几个目录注意完成监听器的订阅绑定后**会自动触发一次去获取这些目录上的当前数据**
1. 当前所引入的服务的动态配置目录:/dubbo/config/dubbo/org.apache.dubbo.demo.DemoService:1.1.1:g1.configurators
2. 当前所引入的服务的提供者目录:/dubbo/org.apache.dubbo.demo.DemoService/providers
3. 当前所引入的服务的老版本动态配置目录:/dubbo/org.apache.dubbo.demo.DemoService/configurators
4. 当前所引入的服务的老版本路由器目录:/dubbo/org.apache.dubbo.demo.DemoService/routers
6. 调用cluster.join(directory)得到一个invoker
7. 返回invoker如果消费者引入了多个group中的服务那么这里返回的是new MergeableClusterInvoker<T>(directory);否则返回的是new FailoverClusterInvoker<T>(directory);
8. 但是上面返回的两个Invoker都会被MockClusterInvoker包装所以最终返回的是MockClusterInvoker。
### Dubbo官方给的Demo
```java
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.dubbo.demo.consumer.comp;
import org.apache.dubbo.config.annotation.Reference;
import org.apache.dubbo.demo.DemoService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component("demoServiceComponent")
public class DemoServiceComponent implements DemoService {
@Reference(version = "1.0.1", group = "youthlql", mock = "fail: return 123")
private DemoService demoService;
@Override
public String sayHello(String name) {
return demoService.sayHello(name); // Invoker
}
}
```
```java
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.dubbo.demo.consumer;
import org.apache.dubbo.config.ConsumerConfig;
import org.apache.dubbo.config.ReferenceConfig;
import org.apache.dubbo.config.spring.context.annotation.EnableDubbo;
import org.apache.dubbo.demo.DemoService;
import org.apache.dubbo.demo.consumer.comp.DemoServiceComponent;
import org.apache.dubbo.rpc.service.EchoService;
import org.springframework.context.annotation.*;
import java.io.IOException;
public class Application {
/**
* In order to make sure multicast registry works, need to specify '-Djava.net.preferIPv4Stack=true' before
* launch the application
*/
public static void main(String[] args) throws IOException {
AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(ConsumerConfiguration.class);
context.start();
DemoService service = context.getBean("demoServiceComponent", DemoServiceComponent.class);
System.out.println("开始调用");
String hello = service.sayHello("world");
System.out.println("result :" + hello);
System.in.read();
}
@Configuration
@EnableDubbo(scanBasePackages = "org.apache.dubbo.demo.consumer.comp")
@PropertySource("classpath:/spring/dubbo-consumer.properties")
@ComponentScan(value = {"org.apache.dubbo.demo.consumer.comp"})
static class ConsumerConfiguration {
@Bean
public ConsumerConfig consumerConfig() {
return new ConsumerConfig();
}
}
}
```
### 服务引入过程概述
#### 什么是服务引入
1. 首先服务引入的意思在Spring启动的时候去扫描那个@Reference注解标注的Bean把它注入到容器中并且去构建一下服务真正调用时候的一些必要东西比如调用哪个IP哪个端口的哪个服务实例这样的东西叫做服务目录。
2. 注意服务目录是需要提前构建到本地,不能等你调用的时候再去注册中心获取,这样会很慢。从这个角度来说,服务目录相当于把注册中心上关于服务提供者的信息提前缓存到本地,只不过做了一些转化。
3. 总的来说,服务引入的目的就是构建服务目录
4. 每一个服务提供者的接口都有一个服务目录,比如我们用@Reference标记了DemoService和MyService两个服务提供者,那么这两个服务提供者都有自己的服务目录
消费端每个服务对应一个服务目录RegistryDirectory。
一个服务目录中包含了:
1. serviceType表示服务接口
2. serviceKey表示引入的服务keyserviceclass+version+group
3. queryMap表示引入的服务的参数配置
4. configurators动态配置
5. routerChain路由链
6. invokers表示服务目录当前缓存的服务提供者Invoker
7. ConsumerConfigurationListener监听本应用的动态配置
8. ReferenceConfigurationListener监听所引入的服务的动态配置
#### 大致的步骤
1. 解析@Reference注解上的一些配置生成URL
2. 监听注册中心并拉取注册中心上相关的配置。
1. 服务启动时的-D参数并且从注册中心上拉取web端的动态配置。这些参数配置根据优先级顺序覆盖掉注册中心上服务提供者的配置。优先级的话前面也讲过web端的动态配置 > -D参数 > @Reference注解 > @Services生成的提供者URL
2. 注册中心上也包括了路由链的配置
3. 监听。首先服务目录就是把注册中心上的信息缓存到了本地,所以如果注册中心上信息变了,本地的服务目录也要随之更新,也就是需要监听。
3. 把上面几步整合的URL转成`List<DubboInvoker>``List<DubboInvoker>`才是真正要用的
#### 监听
在服务消费端有几个监听器:
1. ConsumerConfigurationListener监听本应用的动态配置当应用的动态配置发生了修改后会调用RegistryDirectory的refreshInvoker()方法,对应的路径为:**"/dubbo/config/dubbo/dubbo-demo-consumer-application.configurators"**
2. ReferenceConfigurationListener监听所引入的服务的动态配置当服务的动态配置发生了修改后会调用RegistryDirectory的refreshInvoker()方法,对应的路径为:**"/dubbo/config/dubbo/org.apache.dubbo.demo.DemoService:1.1.1:g1.configurators"**
3. RegistryDirectoryRegistryDirectory本身也是一个监听器它会监听所引入的服务提供者、服务动态配置老版本、服务路由路径分别为
1. **"/dubbo/org.apache.dubbo.demo.DemoService/providers"**
2. **"/dubbo/org.apache.dubbo.demo.DemoService/configurators"**
3. **"/dubbo/org.apache.dubbo.demo.DemoService/routers"**
4. 路由器Router每个Router自己本身也是一个监听器负责监听对应的路径
1. AppRouter应用路由监听的路径为**"/dubbo/config/dubbo/dubbo-demo-consumer-application.condition-router"**
2. ServiceRouter: 服务路由,监听的路径为**"/dubbo/config/dubbo/org.apache.dubbo.demo.DemoService:1.1.1:g1.condition-router"**
3. TagRouter: 标签路由标签路由和应用路由、服务路由有所区别应用路由和服务路由都是在消费者启动在构造路由链时会进行监听器的绑定但是标签路由不是消费者启动的时候绑定监听器的是在引入服务时获取到服务的提供者URL之后才会去监听.tag-router节点中的内容监听的路径为**"/dubbo/config/dubbo/dubbo-demo-provider-application.tag-router"**
5. 当ConsumerConfigurationListener接收到了消费者应用的动态配置数据变化后会调用当前消费者应用中的所有RegistryDirectory的refreshInvoker()方法表示刷新消费者应用中引入的每个服务对应的Invoker
6. 当ReferenceConfigurationListener接收到了某个服务的动态配置数据变化后会调用该服务对应的RegistryDirectory的refreshInvoker()方法表示刷新该服务对应的Invoker
7. 当AppRouter和ServiceRouter接收到条件路由的数据变化后就会更新Router内部的routerRule和conditionRouters属性。这两个属性在服务调用过程中会用到。
8. 当TagRouter接收到标签路由的数据变化后就会更新TagRouter内部的tagRouterRule的属性这个属性在服务调用过程中会用到。
9. 当RegistryDirectory接收到"/dubbo/org.apache.dubbo.demo.DemoService/configurators"节点数据变化后,会生成**configurators**
10. 当RegistryDirectory接收到"/dubbo/org.apache.dubbo.demo.DemoService/**routers**"节点数据变化后会生成Router并添加到**routerChain中**
11. 当RegistryDirectory接收到"/dubbo/org.apache.dubbo.demo.DemoService/**providers**"节点数据变化后会调用refreshOverrideAndInvoker()方法。这个方法就是用来针对每个服务提供者来生成Invoker的。
1. refreshOverrideAndInvoker方法中首先调用overrideDirectoryUrl()方法利用Configurators重写目录地址目录地址是这样的`zookeeper://127.0.0.1:2181/org.apache.dubbo.registry.RegistryService?application=dubbo-demo-consumer-application&dubbo=2.0.2&group=g1&interface=org.apache.dubbo.demo.DemoService&lazy=false&methods=sayHello&pid=49964&register.ip=192.168.40.17&release=2.7.0&revision=1.1.1&side=consumer&sticky=false&timestamp=1591339005022&version=1.1.1`在注册中心URL基础上把当前引入服务的参数作为URL的Parameters所以这个地址既包括了注册中心的信息也包括了当前引入服务的信息
2. 利用老版本configuratorsConsumer应用的configurators引入的服务的configurators去重写目录地址。
3. 重写往目录地址后调用refreshInvoker(urls)方法去刷新Invoker
4. 在refreshInvoker(urls)方法中会把从注册中心获取到的providers节点下的服务URL调用toInvokers(invokerUrls)方法得到Invoker
5. 先按Protocol进行过滤并且调用DubboProtocol.refer方法得到Invoker
6. 将得到的invokers设置到RouterChain上并且调用RouterChain上所有的**routers的**notify(**invokers**)方法实际上这里只有TagRouter的notify方法有用
7. 再把属于同一个group中的invoker合并起来
8. 这样Invoker就生成好了
<img src="https://unpkg.zhimg.com/youthlql@1.0.4/rpc/dubbo/v1/06_di_liu_jie/Dubbo监听架构图.png"/>
### 源码分析-解析@Reference注解上的配置
#### ReferenceBean
```java
/**
* 1.这里是入口方法,Spring容器在启动时就会先生成ReferenceBean对象接着会调用{@link ReferenceConfig#get()}
* 2.意思就是当你标注了注解@ReferenceSpring启动扫描bean的时候会调用此类重写的
* {@link ReferenceBean#afterPropertiesSet()},最终会调到get()返回一个代理对象给那个bean
* 3.也就是说在调用此方法之前ReferenceBean对象就已经生成了里面的属性值也都有了。
*/
@Override
public Object getObject() {
// 这里调用ReferenceConfig#get()
return get();
}
public void afterPropertiesSet() throws Exception {
//前面代码太长了,省略...
if (shouldInit()) {
getObject();
}
```
#### ReferenceConfig
```java
public synchronized T get() {
checkAndUpdateSubConfigs();
if (destroyed) {
throw new IllegalStateException("The invoker of ReferenceConfig(" + url + ") has already destroyed!");
}
/**
* 前面讲过{@link ReferenceConfig#ref}属性就是代理对象
*/
if (ref == null) {
// 入口,这里就是核心了
init();
}
return ref; // 返回的是Invoke代理
}
private void init() {
if (initialized) {
return;
}
//代码太长省略了,主要就是跟服务导出一样,准备参数。比如@Reference注解里的属性还有properties的配置参数等
// 关键
ref = createProxy(map);
String serviceKey = URL.buildKey(interfaceName, group, version);
ApplicationModel.initConsumerModel(serviceKey, buildConsumerModel(serviceKey, attributes));
initialized = true;
}
private T createProxy(Map<String, String> map) {
if (shouldJvmRefer(map)) {
// injvm://
URL url = new URL(LOCAL_PROTOCOL, LOCALHOST_VALUE, 0, interfaceClass.getName()).addParameters(map);
invoker = REF_PROTOCOL.refer(interfaceClass, url);
if (logger.isInfoEnabled()) {
logger.info("Using injvm service " + interfaceClass.getName());
}
} else {
// 为什么会有urls因为可以在@Reference的url属性中配置多个url可以是点对点的服务地址也可以是注册中心的地址
urls.clear(); // reference retry init will add url to urls, lead to OOM
// @Reference中指定了url属性
// @Reference上标注的属性是在Spring启动时就扫描进了相应的属性里所以这里如果配置了就会有
// 前面讲过扫描到这些东西赋值给对应的属性
if (url != null && url.length() > 0) { // user specified URL, could be peer-to-peer address, or register center's address.
String[] us = SEMICOLON_SPLIT_PATTERN.split(url); // 用;号切分
if (us != null && us.length > 0) {
for (String u : us) {
URL url = URL.valueOf(u);
if (StringUtils.isEmpty(url.getPath())) {
url = url.setPath(interfaceName);
}
// 如果是注册中心地址则在url中添加一个refer参数
if (REGISTRY_PROTOCOL.equals(url.getProtocol())) {
// map表示消费者端配置的参数
urls.add(url.addParameterAndEncoded(REFER_KEY, StringUtils.toQueryString(map)));
} else {
// 如果是服务地址
// 有可能url中配置了参数map中表示的服务消费者消费服务时的参数所以需要合并
urls.add(ClusterUtils.mergeUrl(url, map));
}
}
}
} else { // assemble URL from register center's configuration
// @Reference中的protocol属性表示使用哪个协议调用服务如果不是本地调用协议injvm://,则把注册中心地址找出来
// 对于injvm://协议已经在之前的逻辑中就已经生成invoke了
// if protocols not injvm checkRegistry
if (!LOCAL_PROTOCOL.equalsIgnoreCase(getProtocol())) {
checkRegistry();
// 加载注册中心地址
List<URL> us = loadRegistries(false);
if (CollectionUtils.isNotEmpty(us)) {
for (URL u : us) {
URL monitorUrl = loadMonitor(u);
if (monitorUrl != null) {
map.put(MONITOR_KEY, URL.encode(monitorUrl.toFullString()));
}
// 对于注册中心地址都添加REFER_KEY
urls.add(u.addParameterAndEncoded(REFER_KEY, StringUtils.toQueryString(map)));
}
}
if (urls.isEmpty()) {
throw new IllegalStateException("No such any registry to reference " + interfaceName + " on the consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion() + ", please config <dubbo:registry address=\"...\" /> to your spring config.");
}
}
}
/**
* 1.如果URL是registry表明这是注册中心相关的调用进来的一般来说注册中心只有一个
* 就会进入这个方法
* 2.下面还是会用到SPI机制就是首先调用Protocol接口的两个wrapper包装。debug时候的顺序是
* ProtocolListenerWrapper -> ProtocolFilterWrapper
* 3.然后根据SPI机制由于URL是Registry,就调用RegistryProtocol.refer()
*/
if (urls.size() == 1) {
// RegistryProtocol.refer() 或者 DubboProtocol.refer()
invoker = REF_PROTOCOL.refer(interfaceClass, urls.get(0));
// MockClusterInvoker-->FailoverClusterInvoker-->RegistryDirectory
// --->RegistryDirectory$InvokerDelegate-->ListenerInvokerWrapper-->ProtocolFilterWrapper$CallbackRegistrationInvoker-->ConsumerContextFilter-->FutureFilter-->MonitorFilter-->AsyncToSyncInvoker-->DubboInvoker
// --->RegistryDirectory$InvokerDelegate-->ListenerInvokerWrapper-->ProtocolFilterWrapper$CallbackRegistrationInvoker-->ConsumerContextFilter-->FutureFilter-->MonitorFilter-->AsyncToSyncInvoker-->DubboInvoker
} else {
// 如果有多个url
// 1. 根据每个urlrefer得到对应的invoker
// 2. 如果这多个urls中存在注册中心url则把所有invoker整合为RegistryAwareClusterInvoker该Invoker在调用时
// 会查看所有Invoker中是否有默认的如果有则使用默认的Invoker如果没有则使用第一个Invoker
// 3. 如果这多个urls中不存在注册中心url则把所有invoker整合为FailoverCluster
List<Invoker<?>> invokers = new ArrayList<Invoker<?>>();
URL registryURL = null; // 用来记录urls中最后一个注册中心url
for (URL url : urls) {
invokers.add(REF_PROTOCOL.refer(interfaceClass, url));
if (REGISTRY_PROTOCOL.equals(url.getProtocol())) {
registryURL = url; // use last registry url
}
}
// 如果存在注册中心地址
if (registryURL != null) { // registry url is available
// use RegistryAwareCluster only when register's CLUSTER is available
URL u = registryURL.addParameter(CLUSTER_KEY, RegistryAwareCluster.NAME);
// StaticDirectory表示静态服务目录里面的invokers是不会变的, 生成一个RegistryAwareCluster
// The invoker wrap relation would be: RegistryAwareClusterInvoker(StaticDirectory) -> FailoverClusterInvoker(RegistryDirectory, will execute route) -> Invoker
/**
*
* {@link org.apache.dubbo.rpc.cluster.support.wrapper.MockClusterWrapper#join(Directory)}
* {@link org.apache.dubbo.rpc.cluster.support.wrapper.MockClusterInvoker}
* {@link RegistryAwareCluster}
* {@link org.apache.dubbo.rpc.cluster.support.RegistryAwareClusterInvoker}
* 1.这里会先调用MockClusterWrapper#join生成一个MockClusterInvoker(为什么先调用wrapper前面讲过)
* 2.接着在MockClusterWrapper#join方法中会继续调用join方法通过SPI机制实际调用的是
* RegistryAwareCluster生成RegistryAwareClusterInvoker
*
*/
invoker = CLUSTER.join(new StaticDirectory(u, invokers));
} else { // not a registry url, must be direct invoke.
// 如果不存在注册中心地址, 生成一个FailoverClusterInvoker
invoker = CLUSTER.join(new StaticDirectory(invokers));
}
}
}
if (shouldCheck() && !invoker.isAvailable()) {
throw new IllegalStateException("Failed to check the status of the service " + interfaceName + ". No provider available for the service " + (group == null ? "" : group + "/") + interfaceName + (version == null ? "" : ":" + version) + " from the url " + invoker.getUrl() + " to the consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion());
}
if (logger.isInfoEnabled()) {
logger.info("Refer dubbo service " + interfaceClass.getName() + " from url " + invoker.getUrl());
}
/**
* @since 2.7.0
* ServiceData Store
*/
MetadataReportService metadataReportService = null;
if ((metadataReportService = getMetadataReportService()) != null) {
URL consumerURL = new URL(CONSUMER_PROTOCOL, map.remove(REGISTER_IP_KEY), 0, map.get(INTERFACE_KEY), map);
metadataReportService.publishConsumer(consumerURL);
}
// create service proxy
return (T) PROXY_FACTORY.getProxy(invoker);
}
```
<img src="https://unpkg.zhimg.com/youthlql@1.0.4/rpc/dubbo/v1/06_di_liu_jie/image-20210915224055718.png"/>
Dubbo官方给的Demo没有配置URL所以这里就是NULL
<img src="https://unpkg.zhimg.com/youthlql@1.0.4/rpc/dubbo/v1/06_di_liu_jie/image-20210920151657492.png"/>
<img src="https://unpkg.zhimg.com/youthlql@1.0.4/rpc/dubbo/v1/06_di_liu_jie/image-20210920151857950.png" />
<img src="https://unpkg.zhimg.com/youthlql@1.0.4/rpc/dubbo/v1/06_di_liu_jie/image-20210920152003415.png"/>
#### MockClusterWrapper
```java
public class MockClusterWrapper implements Cluster {
private Cluster cluster;
public MockClusterWrapper(Cluster cluster) {
this.cluster = cluster;
}
@Override
public <T> Invoker<T> join(Directory<T> directory) throws RpcException {
return new MockClusterInvoker<T>(directory,
this.cluster.join(directory));
}
}
```
#### RegistryAwareCluster
```java
public class RegistryAwareCluster implements Cluster {
public final static String NAME = "registryaware";
@Override
public <T> Invoker<T> join(Directory<T> directory) throws RpcException {
return new RegistryAwareClusterInvoker<T>(directory);
}
}
```
#### RegistryAwareClusterInvoker
```java
public class RegistryAwareClusterInvoker<T> extends AbstractClusterInvoker<T> {
private static final Logger logger = LoggerFactory.getLogger(RegistryAwareClusterInvoker.class);
public RegistryAwareClusterInvoker(Directory<T> directory) {
super(directory);
}
@Override
@SuppressWarnings({"unchecked", "rawtypes"})
public Result doInvoke(Invocation invocation, final List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
// First, pick the invoker (XXXClusterInvoker) that comes from the local registry, distinguish by a 'default' key.
// 如果在引入服务时存在多个invoker并且某个invoker是default的则在调用时会使用该invoker不会调用其它invoker
//就比如说配置文件写了两个注册中心,然后会生成
/**
* 1.如果在引入服务时存在多个invoker并且某个invoker是default的
* 则在调用时会使用该invoker不会调用其它invoker
* 2.比如说配置文件写了两个注册中心一个redis和一个zookeeper
*/
for (Invoker<T> invoker : invokers) {
if (invoker.isAvailable() && invoker.getUrl().getParameter(REGISTRY_KEY + "." + DEFAULT_KEY, false)) {
return invoker.invoke(invocation);
}
}
// If none of the invokers has a local signal, pick the first one available.
// 如果没有default则取第一个
for (Invoker<T> invoker : invokers) {
// 如果对应的注册中心中没有当前调用的服务信息,则不可用
if (invoker.isAvailable()) {
return invoker.invoke(invocation);
}
}
throw new RpcException("No provider available in " + invokers);
}
}
```
### 源码分析-监听注册中心并第一次拉取provider配置
#### RegistryProtocol
```java
public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
// 从registry://的url中获取对应的注册中心比如zookeeper
// url由 registry:// 改变为---> zookeeper://
url = URLBuilder.from(url)
.setProtocol(url.getParameter(REGISTRY_KEY, DEFAULT_REGISTRY))
.removeParameter(REGISTRY_KEY)
.build();
// 拿到注册中心实现ZookeeperRegistry
Registry registry = registryFactory.getRegistry(url);
// 下面这个代码通过过git历史提交记录是用来解决SimpleRegistry不可用的问题不用管这里
if (RegistryService.class.equals(type)) {
return proxyFactory.getInvoker((T) registry, type, url);
}
// qs表示 queryString, 表示url中的参数表示消费者引入服务时@Reference所配置的参数
// 就是把URL中的参数变成kv形式的map
Map<String, String> qs = StringUtils.parseQueryString(url.getParameterAndDecoded(REFER_KEY));
// group="a,b" or group="*"
// https://dubbo.apache.org/zh/docs/v2.7/user/examples/group-merger/
String group = qs.get(GROUP_KEY);
if (group != null && group.length() > 0) {
if ((COMMA_SPLIT_PATTERN.split(group)).length > 1 || "*".equals(group)) {
// group有多个值这里的cluster为MergeableCluster
return doRefer(getMergeableCluster(), registry, type, url);
}
}
// 这里的cluster是cluster的Adaptive对象,扩展点
return doRefer(cluster, registry, type, url);
}
private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) {
// RegistryDirectory表示动态服务目录会和注册中心的数据保持同步
// type表示一个服务对应一个RegistryDirectoryurl表示注册中心地址
// 在消费端最核心的就是RegistryDirectory
RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url);
directory.setRegistry(registry);
directory.setProtocol(protocol);
// all attributes of REFER_KEY
// 引入服务所配置的参数
Map<String, String> parameters = new HashMap<String, String>(directory.getUrl().getParameters());
// 消费者url
URL subscribeUrl = new URL(CONSUMER_PROTOCOL, parameters.remove(REGISTER_IP_KEY), 0, type.getName(), parameters);
if (!ANY_VALUE.equals(url.getServiceInterface()) && url.getParameter(REGISTER_KEY, true)) {
directory.setRegisteredConsumerUrl(getRegisteredConsumerUrl(subscribeUrl, url));
// 注册简化后的消费url
registry.register(directory.getRegisteredConsumerUrl());
}
// 构造路由链,路由链会在引入服务时按路由条件进行过滤
// 路由链是动态服务目录中的一个属性,通过路由链可以过滤某些服务提供者
directory.buildRouterChain(subscribeUrl);
/**
* 1.服务目录需要订阅监听的几个路径
* 当前应用所对应的动态配置目录:/dubbo/config/dubbo/dubbo-demo-consumer-application.configurators
* 当前所引入的服务的动态配置目录:/dubbo/config/dubbo/org.apache.dubbo.demo.DemoService:1.1.1:g1.configurators
* 当前所引入的服务的提供者目录:/dubbo/org.apache.dubbo.demo.DemoService/providers
* 当前所引入的服务的老版本动态配置目录:/dubbo/org.apache.dubbo.demo.DemoService/configurators
* 当前所引入的服务的老版本路由器目录:/dubbo/org.apache.dubbo.demo.DemoService/routers
* 这里由于Dubbo新老版本的兼容导致监听路径看着有点乱
* 2.在订阅监听指定路径这一步也会直接把相应路径下的配置拉下来,
*/
directory.subscribe(subscribeUrl.addParameter(CATEGORY_KEY,
PROVIDERS_CATEGORY + "," + CONFIGURATORS_CATEGORY + "," + ROUTERS_CATEGORY));
// 利用传进来的clusterjoin得到invoker, MockClusterWrapper
Invoker invoker = cluster.join(directory);
ProviderConsumerRegTable.registerConsumer(invoker, url, subscribeUrl, directory);
return invoker;
}
```
<img src="https://unpkg.zhimg.com/youthlql@1.0.4/rpc/dubbo/v1/06_di_liu_jie/image-20210920152813300.png"/>
#### RegistryDirectory
```java
public void subscribe(URL url) {
setConsumerUrl(url);
CONSUMER_CONFIGURATION_LISTENER.addNotifyListener(this); // 监听consumer应用
serviceConfigurationListener = new ReferenceConfigurationListener(this, url); // 监听所引入的服务的动态配置
// 这里又是SPI会先调用FailbackRegistry
registry.subscribe(url, this);
}
```
1. 这里再讲一下为什么这里会先调用FailbackRegistry。
2. 这里的SPI机制是根据this对象的registry属性来决定的this是RegistryDirectory对象RegistryDirectory实现了NotifyListener接口
3. 看下面的截图registry属性是zookeeper的URL所以应该是要调用ZookeeperRegistry的subscribe()方法但是ZookeeperRegistry没有这个方法所以我们就要找它的父类了也就是FailbackRegistry
4. 然后再调用doSubscribe()ZookeeperRegistry重写了此方法很明显这是个模板模式。
<img src="https://unpkg.zhimg.com/youthlql@1.0.4/rpc/dubbo/v1/06_di_liu_jie/image-20210920154023711.png" />
#### FailbackRegistry
```java
public void subscribe(URL url, NotifyListener listener) {
super.subscribe(url, listener);
removeFailedSubscribed(url, listener);
try {
// Sending a subscription request to the server side
doSubscribe(url, listener);
} catch (Exception e) {
Throwable t = e;
List<URL> urls = getCacheUrls(url);
if (CollectionUtils.isNotEmpty(urls)) {
notify(url, listener, urls);
logger.error("Failed to subscribe " + url + ", Using cached list: " + urls + " from cache file: " + getUrl().getParameter(FILE_KEY, System.getProperty("user.home") + "/dubbo-registry-" + url.getHost() + ".cache") + ", cause: " + t.getMessage(), t);
} else {
// If the startup detection is opened, the Exception is thrown directly.
boolean check = getUrl().getParameter(Constants.CHECK_KEY, true)
&& url.getParameter(Constants.CHECK_KEY, true);
boolean skipFailback = t instanceof SkipFailbackWrapperException;
if (check || skipFailback) {
if (skipFailback) {
t = t.getCause();
}
throw new IllegalStateException("Failed to subscribe " + url + ", cause: " + t.getMessage(), t);
} else {
logger.error("Failed to subscribe " + url + ", waiting for retry, cause: " + t.getMessage(), t);
}
}
// Record a failed registration request to a failed list, retry regularly
// 添加listener向zk添加监听器时如果报错了那么会把这个listener添加到failedSubscribed中并会定时重试重新注册listener
addFailedSubscribed(url, listener);
}
}
```
#### ZookeeperRegistry
```java
// 进行订阅先看父类的subscribe方法
@Override
public void doSubscribe(final URL url, final NotifyListener listener) {
try {
if (ANY_VALUE.equals(url.getServiceInterface())) {
// 订阅所有服务
String root = toRootPath();
ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.get(url);
if (listeners == null) {
zkListeners.putIfAbsent(url, new ConcurrentHashMap<>());
listeners = zkListeners.get(url);
}
ChildListener zkListener = listeners.get(listener);
if (zkListener == null) {
listeners.putIfAbsent(listener, (parentPath, currentChilds) -> {
for (String child : currentChilds) {
child = URL.decode(child);
if (!anyServices.contains(child)) {
anyServices.add(child);
subscribe(url.setPath(child).addParameters(INTERFACE_KEY, child,
Constants.CHECK_KEY, String.valueOf(false)), listener);
}
}
});
zkListener = listeners.get(listener);
}
zkClient.create(root, false);
List<String> services = zkClient.addChildListener(root, zkListener);
if (CollectionUtils.isNotEmpty(services)) {
for (String service : services) {
service = URL.decode(service);
anyServices.add(service);
subscribe(url.setPath(service).addParameters(INTERFACE_KEY, service,
Constants.CHECK_KEY, String.valueOf(false)), listener);
}
}
} else {
// 单独订阅某一个服务
List<URL> urls = new ArrayList<>();
// 得到真正要监听的zk上的路径,
for (String path : toCategoriesPath(url)) {
// 根据监听地址去拿listeners如果没有则生成
ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.get(url);
if (listeners == null) {
zkListeners.putIfAbsent(url, new ConcurrentHashMap<>());
listeners = zkListeners.get(url);
}
// 一个NotifyListener对应一个ChildListener
ChildListener zkListener = listeners.get(listener);
if (zkListener == null) {
// lambda表达式就是监听逻辑 parentPath表示父path,currentChilds表示当前拥有的child, 会调用notify方法进行实际的处理
listeners.putIfAbsent(listener, (parentPath, currentChilds) -> ZookeeperRegistry.this.notify(url, listener, toUrlsWithEmpty(url, parentPath, currentChilds)));
zkListener = listeners.get(listener);
}
// 创建zk上路径
zkClient.create(path, false);
// 添加真正跟zk相关的ChildListener,ChildListener中的逻辑就是监听到zk上数据发生了变化后会触发的逻辑
List<String> children = zkClient.addChildListener(path, zkListener);
if (children != null) {
urls.addAll(toUrlsWithEmpty(url, path, children));
}
}
// 这里的urls就是从现在所引入的服务的目录下先主动拉一次配置比如下面这个三个目录下的路径
// "/dubbo/org.apache.dubbo.demo.DemoService/providers"
// "/dubbo/org.apache.dubbo.demo.DemoService/configurators"
// "/dubbo/org.apache.dubbo.demo.DemoService/routers"
notify(url, listener, urls);
}
} catch (Throwable e) {
throw new RpcException("Failed to subscribe " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
}
}
```
#### FailbackRegistry
这里再调用父类的通知方法,先主动把注册中心上的配置拉下来一次
```java
/**
* 接收到通知,处理通知的方法
* @param url 被监听的url
* @param listener 监听器
* @param urls 要么有一个empty://要么有一个或多个override://协议
*/
@Override
protected void notify(URL url, NotifyListener listener, List<URL> urls) {
if (url == null) {
throw new IllegalArgumentException("notify url == null");
}
if (listener == null) {
throw new IllegalArgumentException("notify listener == null");
}
try {
doNotify(url, listener, urls);
} catch (Exception t) {
// 处理通知失败
// Record a failed registration request to a failed list, retry regularly
addFailedNotified(url, listener, urls);
logger.error("Failed to notify for subscribe " + url + ", waiting for retry, cause: " + t.getMessage(), t);
}
}
protected void doNotify(URL url, NotifyListener listener, List<URL> urls) {
super.notify(url, listener, urls);
}
```
#### AbstractRegistry
```java
protected void notify(URL url, NotifyListener listener, List<URL> urls) {
if (url == null) {
throw new IllegalArgumentException("notify url == null");
}
if (listener == null) {
throw new IllegalArgumentException("notify listener == null");
}
if ((CollectionUtils.isEmpty(urls))
&& !ANY_VALUE.equals(url.getServiceInterface())) {
logger.warn("Ignore empty notify urls for subscribe url " + url);
return;
}
if (logger.isInfoEnabled()) {
logger.info("Notify urls for subscribe url " + url + ", urls: " + urls);
}
// keep every provider's category.
Map<String, List<URL>> result = new HashMap<>();
for (URL u : urls) {
if (UrlUtils.isMatch(url, u)) {
String category = u.getParameter(CATEGORY_KEY, DEFAULT_CATEGORY);
List<URL> categoryList = result.computeIfAbsent(category, k -> new ArrayList<>());
categoryList.add(u);
}
}
if (result.size() == 0) {
return;
}
Map<String, List<URL>> categoryNotified = notified.computeIfAbsent(url, u -> new ConcurrentHashMap<>());
for (Map.Entry<String, List<URL>> entry : result.entrySet()) {
String category = entry.getKey();
List<URL> categoryList = entry.getValue();
categoryNotified.put(category, categoryList);
//从这里进
listener.notify(categoryList);
// We will update our cache file after each notification.
// When our Registry has a subscribe failure due to network jitter, we can return at least the existing cache URL.
saveProperties(url);
}
}
```
最终走到了这一步
<img src="https://unpkg.zhimg.com/youthlql@1.0.4/rpc/dubbo/v1/06_di_liu_jie/image-20210920154648609.png" />
#### RegistryDirectory
```java
public synchronized void notify(List<URL> urls) {
Map<String, List<URL>> categoryUrls = urls.stream()
.filter(Objects::nonNull)
.filter(this::isValidCategory)
.filter(this::isNotCompatibleFor26x)
.collect(Collectors.groupingBy(url -> {
if (UrlUtils.isConfigurator(url)) {
return CONFIGURATORS_CATEGORY;
} else if (UrlUtils.isRoute(url)) {
return ROUTERS_CATEGORY;
} else if (UrlUtils.isProvider(url)) {
return PROVIDERS_CATEGORY;
}
return "";
}));
// 获取动态配置URL生成configurators
List<URL> configuratorURLs = categoryUrls.getOrDefault(CONFIGURATORS_CATEGORY, Collections.emptyList());
this.configurators = Configurator.toConfigurators(configuratorURLs).orElse(this.configurators);
// 获取老版本路由URL生成Router并添加到路由链中
List<URL> routerURLs = categoryUrls.getOrDefault(ROUTERS_CATEGORY, Collections.emptyList());
toRouters(routerURLs).ifPresent(this::addRouters);
// 获取服务提供者URL
List<URL> providerURLs = categoryUrls.getOrDefault(PROVIDERS_CATEGORY, Collections.emptyList());
refreshOverrideAndInvoker(providerURLs);
}
private void refreshOverrideAndInvoker(List<URL> urls) {
// mock zookeeper://xxx?mock=return null
overrideDirectoryUrl();
refreshInvoker(urls);
}
private void refreshInvoker(List<URL> invokerUrls) { //http:// dubbo://
Assert.notNull(invokerUrls, "invokerUrls should not be null");
if (invokerUrls.size() == 1 && invokerUrls.get(0) != null && EMPTY_PROTOCOL.equals(invokerUrls.get(0).getProtocol())) {
this.forbidden = true; // Forbid to access
this.invokers = Collections.emptyList();
routerChain.setInvokers(this.invokers);
destroyAllInvokers(); // Close all invokers
} else {
this.forbidden = false; // Allow to access
Map<String, Invoker<T>> oldUrlInvokerMap = this.urlInvokerMap; // local reference
if (invokerUrls == Collections.<URL>emptyList()) {
invokerUrls = new ArrayList<>();
}
if (invokerUrls.isEmpty() && this.cachedInvokerUrls != null) {
invokerUrls.addAll(this.cachedInvokerUrls);
} else {
this.cachedInvokerUrls = new HashSet<>();
this.cachedInvokerUrls.addAll(invokerUrls);//Cached invoker urls, convenient for comparison
}
if (invokerUrls.isEmpty()) {
return;
}
// 这里会先按Protocol进行过滤并且调用DubboProtocol.refer方法得到DubboInvoker
Map<String, Invoker<T>> newUrlInvokerMap = toInvokers(invokerUrls);// Translate url list to Invoker map
/**
* If the calculation is wrong, it is not processed.
*
* 1. The protocol configured by the client is inconsistent with the protocol of the server.
* eg: consumer protocol = dubbo, provider only has other protocol services(rest).
* 2. The registration center is not robust and pushes illegal specification data.
*
*/
if (CollectionUtils.isEmptyMap(newUrlInvokerMap)) {
logger.error(new IllegalStateException("urls to invokers error .invokerUrls.size :" + invokerUrls.size() + ", invoker.size :0. urls :" + invokerUrls
.toString()));
return;
}
List<Invoker<T>> newInvokers = Collections.unmodifiableList(new ArrayList<>(newUrlInvokerMap.values()));
// pre-route and build cache, notice that route cache should build on original Invoker list.
// toMergeMethodInvokerMap() will wrap some invokers having different groups, those wrapped invokers not should be routed.
// 得到了所引入的服务Invoker之后把它们设置到路由链中去在调用时使用并且会调用TagRouter的notify方法
routerChain.setInvokers(newInvokers);
this.invokers = multiGroup ? toMergeInvokerList(newInvokers) : newInvokers;
this.urlInvokerMap = newUrlInvokerMap;
try {
destroyUnusedInvokers(oldUrlInvokerMap, newUrlInvokerMap); // Close the unused Invoker
} catch (Exception e) {
logger.warn("destroyUnusedInvokers error. ", e);
}
}
}
private Map<String, Invoker<T>> toInvokers(List<URL> urls) {
Map<String, Invoker<T>> newUrlInvokerMap = new HashMap<>();
if (urls == null || urls.isEmpty()) {
return newUrlInvokerMap;
}
Set<String> keys = new HashSet<>();
String queryProtocols = this.queryMap.get(PROTOCOL_KEY);
// 遍历当前服务所有的服务提供者URL
for (URL providerUrl : urls) {
// If protocol is configured at the reference side, only the matching protocol is selected
if (queryProtocols != null && queryProtocols.length() > 0) {
boolean accept = false;
String[] acceptProtocols = queryProtocols.split(",");
// 当前消费者如果手动配置了Protocol那么则进行匹配
for (String acceptProtocol : acceptProtocols) {
if (providerUrl.getProtocol().equals(acceptProtocol)) {
accept = true;
break;
}
}
if (!accept) {
continue;
}
}
if (EMPTY_PROTOCOL.equals(providerUrl.getProtocol())) {
continue;
}
// 当前Protocol是否在应用中存在对应的扩展点
if (!ExtensionLoader.getExtensionLoader(Protocol.class).hasExtension(providerUrl.getProtocol())) {
logger.error(new IllegalStateException("Unsupported protocol " + providerUrl.getProtocol() +
" in notified url: " + providerUrl + " from registry " + getUrl().getAddress() +
" to consumer " + NetUtils.getLocalHost() + ", supported protocol: " +
ExtensionLoader.getExtensionLoader(Protocol.class).getSupportedExtensions()));
continue;
}
//重要
URL url = mergeUrl(providerUrl);
String key = url.toFullString(); // The parameter urls are sorted
if (keys.contains(key)) { // Repeated url
continue;
}
keys.add(key);
// Cache key is url that does not merge with consumer side parameters, regardless of how the consumer combines parameters, if the server url changes, then refer again
Map<String, Invoker<T>> localUrlInvokerMap = this.urlInvokerMap; // local reference
Invoker<T> invoker = localUrlInvokerMap == null ? null : localUrlInvokerMap.get(key);
// 如果当前服务提供者URL没有生产过Invoker
if (invoker == null) { // Not in the cache, refer again
try {
boolean enabled = true;
if (url.hasParameter(DISABLED_KEY)) {
enabled = !url.getParameter(DISABLED_KEY, false);
} else {
enabled = url.getParameter(ENABLED_KEY, true);
}
if (enabled) {
// 调用Protocol的refer方法得到一个Invoker DubboProtocol.refer()
invoker = new InvokerDelegate<>(protocol.refer(serviceType, url), url, providerUrl);
}
} catch (Throwable t) {
logger.error("Failed to refer invoker for interface:" + serviceType + ",url:(" + url + ")" + t.getMessage(), t);
}
if (invoker != null) { // Put new invoker in cache
newUrlInvokerMap.put(key, invoker);
}
} else {
newUrlInvokerMap.put(key, invoker);
}
}
keys.clear();
return newUrlInvokerMap;
}
// 这里就是根据优先级把动态配置和服务提供者URL等等还有其它一些配置URL合并得到真正能转换成DubboInvoker的URL
private URL mergeUrl(URL providerUrl) {
providerUrl = ClusterUtils.mergeUrl(providerUrl, queryMap); // Merge the consumer side parameters
//动态配置的内容去覆盖
providerUrl = overrideWithConfigurator(providerUrl);
providerUrl = providerUrl.addParameter(Constants.CHECK_KEY, String.valueOf(false)); // Do not check whether the connection is successful or not, always create Invoker!
// The combination of directoryUrl and override is at the end of notify, which can't be handled here
this.overrideDirectoryUrl = this.overrideDirectoryUrl.addParametersIfAbsent(providerUrl.getParameters()); // Merge the provider side parameters
if ((providerUrl.getPath() == null || providerUrl.getPath()
.length() == 0) && DUBBO_PROTOCOL.equals(providerUrl.getProtocol())) { // Compatible version 1.0
//fix by tony.chenl DUBBO-44
String path = directoryUrl.getParameter(INTERFACE_KEY);
if (path != null) {
int i = path.indexOf('/');
if (i >= 0) {
path = path.substring(i + 1);
}
i = path.lastIndexOf(':');
if (i >= 0) {
path = path.substring(0, i);
}
providerUrl = providerUrl.setPath(path);
}
}
return providerUrl;
}
```
### 源码分析-路由链生成
#### 构造路由链概述
1. RouterChain.buildChain(url)方法赋值得到路由链。这里的url是这样的
```html
consumer://192.168.0.100/org.apache.dubbo.demo.DemoService?application=dubbo-demo-consumer-application
&dubbo=2.0.2&group=g1&interface=org.apache.dubbo.demo.DemoService&lazy=false&methods=sayHello&pid=19852
&release=2.7.0&revision=1.1.1&side=consumer&sticky=false&timestamp=1591332529643&version=1.1.1
```
2. 表示所引入的服务的参数在获得路由链时就要根据这些参数去匹配得到符合当前的服务的Router.
**大致源码过程:**
1. RouterChain.buildChain(url)
2. new RouterChain<>(url)
3. List<RouterFactory> extensionFactories = ExtensionLoader.getExtensionLoader(RouterFactory.class).getActivateExtension(url, (String\[\]) null);根据url去获取可用的RouterFactory可以拿到四个
1. MockRouterFactoryMock路由没有order相当于order=0
2. TagRouterFactory: 标签路由order = 100
3. AppRouterFactory: 应用条件路由order = 200
4. ServiceRouterFactory: 服务条件路由order = 300
4. 遍历每个RouterFactory调用getRouter(url)方法得到Router存到List<Router> routers中
5. 对routers按order从小到大的顺序进行排序
**文字描述:**
1. AppRouter和ServiceRouter是非常类似他们的父类都是ListenableRouter在创建AppRouter和ServiceRouter时会绑定一个监听器比如
1. AppRouter监听的是/dubbo/config/dubbo/dubbo-demo-consumer-application.condition-router节点的内容
2. ServiceRouter监听的是/dubbo/config/dubbo/org.apache.dubbo.demo.DemoService:1.1.1:g1.condition-router节点的内容
2. 绑定完监听器之后会主动去获取一下对应节点的内容也就是所配置的路由规则内容然后会去解析内容得到ConditionRouterRule routerRule再调用generateConditions(routerRule);方法解析出一个或多个ConditionRouter并存入到List<ConditionRouter> conditionRouters中。
3. 注意routerRule和conditionRouters是ListenableRouter的属性就是在AppRouter和ServiceRouter中的。
4. 对于TagRouter就比较特殊首先标签路由是用在当消费者在调用某个服务时通过在请求中设置标签然后根据所设置的标签获得可用的服务提供者地址。而且目前TagRouter只支持应用级别的配置(而且是服务提供者应用,给某个服务提供者应用打标)。
5. 所以对于服务消费者而言,在引用某个服务时,需要知道提供这个服务的应用名,然后去监听这个应用名对应的.tag-router节点内容比如/dubbo/config/dubbo/dubbo-demo-provider-application.tag-router。
6. 那么问题来了怎么才能知道提供这个服务的服务提供者的应用名呢答案是需要先获取到当前所引入服务的服务提供者URL从URL中得到服务提供者的应用名。拿到应用名之后才能去应用名对应的.tag-router节点去绑定监听器。
7. 这就是TagRouter和AppRouter、ServiceRouter的区别对于AppRouter而言监听的是本消费者应用的路由规则对于ServiceRouter而言监听的是所引入服务的路由规则都比较简单。
8. 所以TagRouter是在引入服务时获取到服务的提供者URL之后才会去监听.tag-router节点中的内容并手动获取一次节点中的内容设置TagRouter对象中tagRouterRule属性表示标签路由规则。
到此,路由链构造完毕。
<img src="https://unpkg.zhimg.com/youthlql@1.0.4/rpc/dubbo/v1/06_di_liu_jie/image-20210919190129223.png"/>
RegistryProtocol
```java
private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) {
// RegistryDirectory表示动态服务目录会和注册中心的数据保持同步
// type表示一个服务对应一个RegistryDirectoryurl表示注册中心地址
// 在消费端最核心的就是RegistryDirectory
RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url);
directory.setRegistry(registry);
directory.setProtocol(protocol);
// all attributes of REFER_KEY
// 引入服务所配置的参数
Map<String, String> parameters = new HashMap<String, String>(directory.getUrl().getParameters());
// 消费者url
URL subscribeUrl = new URL(CONSUMER_PROTOCOL, parameters.remove(REGISTER_IP_KEY), 0, type.getName(), parameters);
if (!ANY_VALUE.equals(url.getServiceInterface()) && url.getParameter(REGISTER_KEY, true)) {
directory.setRegisteredConsumerUrl(getRegisteredConsumerUrl(subscribeUrl, url));
// 注册简化后的消费url
registry.register(directory.getRegisteredConsumerUrl());
}
// 构造路由链,路由链会在引入服务时按路由条件进行过滤
// 路由链是动态服务目录中的一个属性,通过路由链可以过滤某些服务提供者
directory.buildRouterChain(subscribeUrl);
directory.subscribe(subscribeUrl.addParameter(CATEGORY_KEY,
PROVIDERS_CATEGORY + "," + CONFIGURATORS_CATEGORY + "," + ROUTERS_CATEGORY));
// 利用传进来的clusterjoin得到invoker, MockClusterWrapper
Invoker invoker = cluster.join(directory);
ProviderConsumerRegTable.registerConsumer(invoker, url, subscribeUrl, directory);
return invoker;
}
```
#### RegistryDirectory
```java
public void buildRouterChain(URL url) {
this.setRouterChain(RouterChain.buildChain(url));
}
```
#### RouterChain
```java
public static <T> RouterChain<T> buildChain(URL url) {
return new RouterChain<>(url);
}
private RouterChain(URL url) {
// 拿到RouterFactory接口有哪些扩展实现类比如默认情况下就有四个
// 0 = {MockRouterFactory@2880}
// 1 = {TagRouterFactory@2881} // 标签路由
// 2 = {AppRouterFactory@2882} // 应用条件路由
// 3 = {ServiceRouterFactory@2883} // 服务条件路由
List<RouterFactory> extensionFactories = ExtensionLoader.getExtensionLoader(RouterFactory.class)
.getActivateExtension(url, (String[]) null);
// 然后利用RouterFactory根据url生成各个类型的Router
// 这里生产的routers已经是真实可用的了但是有个比较特殊的
// 对于应用条件路由和服务条件路由对于的Router对象对象内部已经有真实可用的数据了数据已经从配置中心得到了
// 但是对于标签路由则没有,它暂时还相当于一个没有内容的对象(还没有从配置中心获取标签路由的数据)
List<Router> routers = extensionFactories.stream()
.map(factory -> factory.getRouter(url))
.collect(Collectors.toList());
// 把routers按priority进行排序
initWithRouters(routers);
}
public void setInvokers(List<Invoker<T>> invokers) {
this.invokers = (invokers == null ? Collections.emptyList() : invokers);
routers.forEach(router -> router.notify(this.invokers));
}
```
#### AppRouterFactory
> 以AppRouterFactory为例
```java
@Activate(order = 200)
public class AppRouterFactory implements RouterFactory {
public static final String NAME = "app";
private volatile Router router;
@Override
public Router getRouter(URL url) {
if (router != null) {
return router;
}
synchronized (this) {
if (router == null) {
router = createRouter(url);
}
}
return router;
}
private Router createRouter(URL url) {
// 内部会进行初始化
return new AppRouter(DynamicConfiguration.getDynamicConfiguration(), url);
}
}
```
#### AppRouter
```java
public class AppRouter extends ListenableRouter {
public static final String NAME = "APP_ROUTER";
/**
* AppRouter should after ServiceRouter
*/
private static final int APP_ROUTER_DEFAULT_PRIORITY = 150;
public AppRouter(DynamicConfiguration configuration, URL url) {
// 拿到应用名
super(configuration, url, url.getParameter(CommonConstants.APPLICATION_KEY));
this.priority = APP_ROUTER_DEFAULT_PRIORITY;
}
}
```
#### ListenableRouter
```java
public ListenableRouter(DynamicConfiguration configuration, URL url, String ruleKey) {
super(configuration, url);
this.force = false;
// ruleKey为服务名或应用名
// 初始化,会绑定一个监听器,负责监听配置中心条件路由的修改,并且会主动从配置中心获取一下当前条件路由的数据并做解析
this.init(ruleKey);
}
private synchronized void init(String ruleKey) {
if (StringUtils.isEmpty(ruleKey)) {
return;
}
// 服务名+".condition-router",或 应用名+".condition-router"
String routerKey = ruleKey + RULE_SUFFIX;
// 绑定一个监听器去监听routerKey对应的路径当前类ListenableRouter就自带了一个监听器
configuration.addListener(routerKey, this);
// 绑定完监听器后,主动的从配置中心获取一下当前服务或消费者应用的对应的路由配置
String rule = configuration.getRule(routerKey, DynamicConfiguration.DEFAULT_GROUP);
if (StringUtils.isNotEmpty(rule)) {
// 手动调用监听器处理事件的方法process()
this.process(new ConfigChangeEvent(routerKey, rule));
}
}
public synchronized void process(ConfigChangeEvent event) {
if (logger.isInfoEnabled()) {
logger.info("Notification of condition rule, change type is: " + event.getChangeType() +
", raw rule is:\n " + event.getValue());
}
if (event.getChangeType().equals(ConfigChangeType.DELETED)) {
// 如果是一个删除事件则清空当前Router中的conditionRouters属性表示当前Router对象中没有路由规则
routerRule = null;
conditionRouters = Collections.emptyList();
} else {
try {
// 解析路由规则
routerRule = ConditionRuleParser.parse(event.getValue());
// 根据路由规则生成ConditionRouter-条件路由对象并赋值给当前Router对象的conditionRouters属性
generateConditions(routerRule);
} catch (Exception e) {
logger.error("Failed to parse the raw condition rule and it will not take effect, please check " +
"if the condition rule matches with the template, the raw rule is:\n " + event.getValue(), e);
}
}
}
```
### Invoker总结
**MockClusterInvoker** 完成Mock功能由MockClusterWrapper生成MockClusterWrapper是Cluster接口的包装类通过Cluster.join()方法得到MockClusterInvoker
**FailoverClusterInvoker**完成集群容错功能是MockClusterInvoker的下级
**RegistryAwareClusterInvoker**如果指定了多个注册中心那么RegistryAwareClusterInvoker完成选择默认的注册中心的进行调用如果没有指定默认的则会遍历注册中心进行调用如果该注册中心没有对应的服务则跳过。
**DubboInvoker**完成Dubbo协议底层发送数据
**ProtocolFilterWrapper$CallbackRegistrationInvoker**完成对filter的调用ProtocolFilterWrapper是Protocol接口的包装类通过Protocol.refer()方法得到CallbackRegistrationInvoke。
### DubboProtocol的服务引入Refer
DubboProtocol中并没有refer方法是在它的父类AbstractProtocol中才有的refer方法
```java
@Override
public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
// 异步转同步Invoker , type是接口url是服务地址
// DubboInvoker是异步的而AsyncToSyncInvoker会封装为同步的
return new AsyncToSyncInvoker<>(protocolBindingRefer(type, url));
}
```
调用protocolBindingRefer()方法得到一个Invoker后会包装为一个AsyncToSyncInvoker然后作为refer方法的结果返回。
在DubboProtocol的protocolBindingRefer()方法中会new一个DubboInvoker然后就返回了。
在构造DubboInvoker时有一个非常重要的步骤构造clients。DubboInvoker作为消费端服务的执行者在调用服务时是需要去发送Invocation请求的而发送请求就需要client之所以有多个client是因为DubboProtocol支持多个。
假如在一个DubboInvoker中有多个Client那么在使用这个DubboInvoker去调用服务时就可以提高效率比如一个服务接口有多个方法那么在业务代码中可能会不断的调用该接口中的方法并且由于DubboProtocol底层会使用异步去发送请求所以在每次需要发送请求时就可以从clients轮询一个client去发送这个数据从而提高效率。
接下来来看看clients是如何生成的。
1. 首先一个DubboInvoker到底支持多少个Client呢这是可以配置的参数为connections按指定的数字调用initClient(url)得到ExchangeClient。
2. initClient(url)的实现逻辑为
1. 获取client参数表示是用netty还是mina等等
2. 获取**codec参数表示数据的编码方式**
3. **获取****heartbeat参数表示长连接的心跳时间超过这个时间服务端没有收到数据则关闭socket默认为1分钟**
4. **如果所指定的client没有对应的扩展点则抛异常**
5. 获取**lazy参数默认为false如果为true那么则直接返回一个**LazyConnectExchangeClient表示真正在发送数据时才建立socket
6. 否则调用Exchangers._connect_(url, **requestHandler**)获得一个client
7. 在connect()方法中调用HeaderExchanger的connect方法去建立socket连接并得到一个HeaderExchangeClient
8. 在构造HeaderExchangeClient时需要先执行Transporters._connect_()方法得到一个Client
9. 会调用NettyTransporter的connect()去构造一个NettyClient
10. 在构造NettyClient的过程中会去初始化Netty的客户端然后连接Server端建立一个Socket连接
### 最复杂情况下的Invoker链
```java
@Reference(url = "dubbo://192.168.40.17:20881/org.apache.dubbo.demo.DemoService;registry://127.0.0.1:2181/org.apache.dubbo.registry.RegistryService?registry=zookeeper")
private DemoService demoService;
```
@Reference注解上定义了url参数,有两个值
1. dubbo://192.168.40.17:20881/org.apache.dubbo.demo.DemoService
2. registry://127.0.0.1:2181/org.apache.dubbo.registry.RegistryService?registry=zookeeper
最终refer处理的invoker链路为
* MockClusterInvoker
* invoker=RegistryAwareClusterInvoker
* directory=StaticDirectory
* 0=**ProtocolFilterWrapper$CallbackRegistrationInvoke子流程**
* 1=MockClusterInvoker
* FailoverClusterInvoker
* RegistryDirectory
* invokers=UnmodifiableRandomAccessList size=1
* 0=RegistryDirectory$InvokerDelegate
* **ProtocolFilterWrapper$CallbackRegistrationInvoke子流程**
* **ProtocolFilterWrapper$CallbackRegistrationInvoke子流程**
* filterInvoker=ProtocolFilterWrapper$1
* filter=ConsumerContextFilter
* next=ProtocolFilterWrapper$1
* filter=FutureFilter
* next=ProtocolFilterWrapper$1
* filter=MonitorFilter
* next=ListenerInvokerWrapper
* invoker=AsyncToSyncInvoker
* invoker=DubboInvoker