--- title: 05.Dubbo源码系列V1-Dubbo第五节-服务导出源码解析 tags: - Dubbo - rpc categories: - rpc - Dubbo源码系列v1 keywords: Dubbo,rpc description: Dubbo服务导出源码解析 cover: 'https://npm.elemecdn.com/lql_static@latest/logo/dubbo.png' abbrlink: '48141866' date: 2021-10-06 14:11:58 --- ## 第五节: Dubbo服务注册(导出)源码解析 ### 笔记更新地址: [https://www.yuque.com/books/share/f2394ae6-381b-4f44-819e-c231b39c1497](https://www.yuque.com/books/share/f2394ae6-381b-4f44-819e-c231b39c1497?#)(密码:kyys) 《Dubbo笔记》 ### 服务导出原理概述 1. 服务导出的入口为ServiceBean中的export()方法,当Spring启动完之后,通过接收Spring的ContextRefreshedEvent事件来触发export()方法的执行。 2. 一个ServiceBean对象就表示一个Dubbo服务,ServiceBean对象中的参数就表示服务的参数,比如timeout,该对象的参数值来至@Service注解中所定义的。 3. 服务导出主要得做两件事情: 1. 根据服务的参数信息,启动对应的网络服务器(netty、tomcat、jetty等),用来接收网络请求 2. 将服务的信息注册到注册中心 4. 但是在做这两件事情之前得先把服务的参数确定好,因为一个Dubbo服务的参数,除开可以在@Service注解中去配置,还会继承Dubbo服务所属应用(Application)上的配置,还可以在配置中心或JVM环境变量中去配置某个服务的参数,所以首先要做的是确定好当前服务最终的(优先级最高)的参数值。 5. 确定好服务参数之后,就根据所配置的协议启动对应的网络服务器。在启动网络服务器时,并且在网络服务器接收请求的过程中,都可以从服务参数中获取信息,比如最大连接数,线程数,socket超时时间等等。 6. 启动完网络服务器之后,就将服务信息注册到注册中心。同时还有向注册中心注册监听器,监听Dubbo的中的动态配置信息变更。 > 服务导出就是服务注册的意思 ### 服务概念的演化 1. DemoService接口表示一个服务,此时的服务表示服务定义 2. DemoServiceImpl表示DemoService服务的具体实现,此时的服务表示服务的具体实现 3. DemoService+group+version表示一个服务,此时的服务增加了分组和版本概念 4. [http://192.168.1.112:80/com.tuling.DemoService](http://192.168.1.112:80/com.luban.DemoService)表示一个服务,此时的服务增加了机器IP和Port,表示远程机器可以访问这个URL来使用com.tuling.DemoService这个服务 5. [http://192.168.1.112:80/com.tuling.DemoService](http://192.168.1.112:80/com.luban.DemoService)?timeout=3000&version=1.0.1&application=dubbo-demo-provider-application表示一个服务,此时的服务是拥有参数的,比如超时时间、版本号、所属应用 在dubbo中就是用的最后一种方式来表示服务的。 ### 服务导出思想 服务导出要做的几件事情: 1. 确定服务的参数 2. 确定服务支持的协议 2. 构造服务最终的URL 2. 根据服务支持的不同协议,启动不同的Server,用来接收和处理请求 3. 将服务URL注册到注册中心去 4. 因为Dubbo支持动态配置服务参数,所以服务导出时还需要绑定一个监听器Listener来监听服务的参数是否有修改,如果发现有修改,则需要重新进行导出 ### 确定服务的参数 #### 确定服务的参数概述 - 在执行ServiceConfig.export()时,此时ServiceConfig对象就代表一个服务(也可以说ServiceBena代表一个服务,因为本来就是继承关系),我们已经知道了这个服务的名字(就是服务提供者接口的名字),并且此时这个服务可能已经有一些参数了,就是**@Service注解上所定义的参数**。 - 但是在Dubbo中,除开可以在@Service注解中给服务配置参数,还有很多地方也可以给服务配置参数,比如: - dubbo.properties文件,你可以建立这个文件,dubbo会去读取这个文件的内容作为服务的参数,Dubob的源码中叫做**PropertiesConfiguration** - 配置中心,dubbo在2.7版本后就支持了分布式配置中心,你可以在Dubbo-Admin中去操作配置中心,分布式配置中心就相当于一个远程的dubbo.properties文件,你可以在Dubbo-Admin中去修改这个dubbo.properties文件,当然配置中心支持按应用进行配置,也可以按全局进行配置两种,在Dubbo的源码中**AppExternalConfiguration**表示应用配置,**ExternalConfiguration**表示全局配置。 - 系统环境变量,你可以在启动应用程序时,通过-D的方式来指定参数,在Dubbo的源码中叫**SystemConfiguration** - 再加上通过@Service注解所配置的参数,在Dubbo的源码中叫**AbstractConfig** - 服务的参数可以从这四个位置来,这四个位置上如果配了同一个参数的话,优先级从高到低有两种情况: - SystemConfiguration -> AppExternalConfiguration -> ExternalConfiguration -> AbstractConfig -> PropertiesConfiguration - SystemConfiguration -> AbstractConfig -> AppExternalConfiguration -> ExternalConfiguration -> PropertiesConfiguration - 在服务导出时,首先得确定服务的参数。当然,服务的参数除开来自于服务的自身配置外,还可以来自其**上级**。比如如果服务本身没有配置timeout参数,但是如果服务所属的应用的配置了timeout,那么这个应用下的服务都会继承这个timeout配置。**所以在确定服务参数时,需要先从上级获取参数,获取之后,如果服务本身配置了相同的参数,那么则进行覆盖。** #### 确定服务的参数源码 ##### ServiceBean ```java //当Spring启动完之后,通过接收Spring的ContextRefreshedEvent事件来触发export()方法的执行。 @Override public void onApplicationEvent(ContextRefreshedEvent event) { // 当前服务没有被导出并且没有卸载,才导出服务 if (!isExported() && !isUnexported()) { if (logger.isInfoEnabled()) { logger.info("The service ready on spring started. service: " + getInterface()); } // 服务导出(服务注册) export(); } } @Override public void export() { //调用ServiceConfig#export() super.export(); // Publish ServiceBeanExportedEvent // Spring启动完发布ContextRefreshedEvent事件--->服务导出--->发布ServiceBeanExportedEvent // 程序员可以通过Spring中的ApplicationListener来监听服务导出是否完成 publishExportEvent(); } private void publishExportEvent() { //监听这个事件就可以知道Dubbo的服务有没有注册完成 ServiceBeanExportedEvent exportEvent = new ServiceBeanExportedEvent(this); applicationEventPublisher.publishEvent(exportEvent); } ``` ##### ServiceConfig ```java public synchronized void export() { //读取服务配置 checkAndUpdateSubConfigs(); // 检查服务是否需要导出 if (!shouldExport()) { return; } // 检查是否需要延迟发布 if (shouldDelay()) { DELAY_EXPORT_EXECUTOR.schedule(this::doExport, getDelay(), TimeUnit.MILLISECONDS); } else { // 导出服务 doExport(); } } public void checkAndUpdateSubConfigs() { // Use default configs defined explicitly on global configs // ServiceConfig中的某些属性如果是空的,那么就从ProviderConfig、ModuleConfig、ApplicationConfig中获取 // 补全ServiceConfig中的属性 completeCompoundConfigs(); // Config Center should always being started first. // 从配置中心获取配置,包括应用配置和全局配置 // 把获取到的配置放入到Environment中的externalConfigurationMap和appExternalConfigurationMap中 // 并刷新所有的XxConfig的属性(除开ServiceConfig),刷新的意思就是将配置中心的配置覆盖调用XxConfig中的属性 // 调用AbstractInterfaceConfig#startConfigCenter() startConfigCenter(); checkDefault(); checkProtocol(); checkApplication(); // if protocol is not injvm checkRegistry // 如果protocol不是只有injvm协议,表示服务调用不是只在本机jvm里面调用,那就需要用到注册中心 if (!isOnlyInJvm()) { checkRegistry(); } // 刷新ServiceConfig,调用AbstractConfig#refresh() this.refresh(); // 如果配了metadataReportConfig,那么就刷新配置 checkMetadataReport(); if (StringUtils.isEmpty(interfaceName)) { throw new IllegalStateException(" interface not allow null!"); } // 当前服务对应的实现类是一个GenericService,表示没有特定的接口 if (ref instanceof GenericService) { interfaceClass = GenericService.class; if (StringUtils.isEmpty(generic)) { generic = Boolean.TRUE.toString(); } } else { // 加载接口 try { interfaceClass = Class.forName(interfaceName, true, Thread.currentThread() .getContextClassLoader()); } catch (ClassNotFoundException e) { throw new IllegalStateException(e.getMessage(), e); } // 刷新MethodConfig,并判断MethodConfig中对应的方法在接口中是否存在 checkInterfaceAndMethods(interfaceClass, methods); // 实现类是不是该接口类型 checkRef(); generic = Boolean.FALSE.toString(); } // local和stub一样,不建议使用了 if (local != null) { // 如果本地存根为true,则存根类为interfaceName + "Local" if (Boolean.TRUE.toString().equals(local)) { local = interfaceName + "Local"; } // 加载本地存根类 Class localClass; try { localClass = ClassUtils.forNameWithThreadContextClassLoader(local); } catch (ClassNotFoundException e) { throw new IllegalStateException(e.getMessage(), e); } if (!interfaceClass.isAssignableFrom(localClass)) { throw new IllegalStateException("The local implementation class " + localClass.getName() + " not implement interface " + interfaceName); } } // 本地存根 if (stub != null) { // 如果本地存根为true,则存根类为interfaceName + "Stub" if (Boolean.TRUE.toString().equals(stub)) { stub = interfaceName + "Stub"; } Class stubClass; try { stubClass = ClassUtils.forNameWithThreadContextClassLoader(stub); } catch (ClassNotFoundException e) { throw new IllegalStateException(e.getMessage(), e); } if (!interfaceClass.isAssignableFrom(stubClass)) { throw new IllegalStateException("The stub implementation class " + stubClass.getName() + " not implement interface " + interfaceName); } } // 检查local和stub checkStubAndLocal(interfaceClass); // 检查mock checkMock(interfaceClass); } /** * 1.上节课我们在启动类上写了这个配置 * @PropertySource("classpath:/spring/dubbo-provider.properties") * 2.那么Spring启动的时候就会加载里面的配置到一些xxxConfig里面【Spring整合Dubbo的时候讲过】 * 3.@Service注解里配置的参数被首先读取到了ServiceBean里 * 4.接着会调用这个方法进行补全ServiceBean的配置,从哪里补全呢?就是从上面我们配置的 * dubbo-provider.properties 进行补全 * 5.ServcieBean继承了ServiceConfig,所以它两是一个意思,这里强调一下 * 免得后续看不明白 */ private void completeCompoundConfigs() { // 如果配置了provider,那么则从provider中获取信息赋值其他属性,在这些属性为空的情况下 if (provider != null) { if (application == null) { setApplication(provider.getApplication()); } if (module == null) { setModule(provider.getModule()); } if (registries == null) { setRegistries(provider.getRegistries()); } if (monitor == null) { setMonitor(provider.getMonitor()); } if (protocols == null) { setProtocols(provider.getProtocols()); } if (configCenter == null) { setConfigCenter(provider.getConfigCenter()); } } // 如果配置了module,那么则从module中获取信息赋值其他属性,在这些属性为空的情况下 if (module != null) { if (registries == null) { setRegistries(module.getRegistries()); } if (monitor == null) { setMonitor(module.getMonitor()); } } // 如果配置了application,那么则从application中获取信息赋值其他属性,在这些属性为空的情况下 if (application != null) { if (registries == null) { setRegistries(application.getRegistries()); } if (monitor == null) { setMonitor(application.getMonitor()); } } } ``` ##### AbstractInterfaceConfig ```java void startConfigCenter() { if (configCenter == null) { ConfigManager.getInstance().getConfigCenter().ifPresent(cc -> this.configCenter = cc); } // 如果配置了ConfigCenter if (this.configCenter != null) { // 从其他位置获取配置中心的相关属性信息,比如配置中心地址 // TODO there may have duplicate refresh this.configCenter.refresh(); // 属性更新后,从远程配置中心获取数据(应用配置,全局配置) prepareEnvironment(); } // 从配置中心取到配置数据后,刷新所有的XxConfig中的属性,除开ServiceConfig ConfigManager.getInstance().refreshAll(); } private void prepareEnvironment() { if (configCenter.isValid()) { if (!configCenter.checkOrUpdateInited()) { return; } // 动态配置中心,管理台上的配置中心 DynamicConfiguration dynamicConfiguration = getDynamicConfiguration(configCenter.toUrl()); // 如果是zookeeper,获取的就是/dubbo/config/dubbo/dubbo.properties节点中的内容 String configContent = dynamicConfiguration.getProperties(configCenter.getConfigFile(), configCenter.getGroup()); String appGroup = application != null ? application.getName() : null; String appConfigContent = null; if (StringUtils.isNotEmpty(appGroup)) { // 获取的就是/dubbo/config/dubbo-demo-consumer-application/dubbo.properties节点中的内容 // 这里有bug appConfigContent = dynamicConfiguration.getProperties (StringUtils.isNotEmpty(configCenter.getAppConfigFile()) ? configCenter.getAppConfigFile() : configCenter.getConfigFile(), appGroup ); } try { Environment.getInstance().setConfigCenterFirst(configCenter.isHighestPriority()); //这个就是全局的,就是在网页上那个配置管理里的global Environment.getInstance().updateExternalConfigurationMap(parseProperties(configContent)); //这个就是某个应用的配置 Environment.getInstance().updateAppExternalConfigurationMap(parseProperties(appConfigContent)); } catch (IOException e) { throw new IllegalStateException("Failed to parse configurations from Config Center.", e); } } } ``` ##### ConfigManager ```java public void refreshAll() { // refresh all configs here, getApplication().ifPresent(ApplicationConfig::refresh); getMonitor().ifPresent(MonitorConfig::refresh); getModule().ifPresent(ModuleConfig::refresh); getProtocols().values().forEach(ProtocolConfig::refresh); getRegistries().values().forEach(RegistryConfig::refresh); getProviders().values().forEach(ProviderConfig::refresh); getConsumers().values().forEach(ConsumerConfig::refresh); } ``` ##### AbstractConfig ```java /** * 1.刷新XxConfig * 2.一个XxConfig对象的属性可能是有值的,也可能是没有值的,这时需要从其他位置获取属性值,来进行属性的覆盖 * 覆盖的优先级,从大到小为系统变量->配置中心应用配置->配置中心全局配置->注解或xml中定义->dubbo.properties文件 * 3.以ServiceConfig为例,ServiceConfig中包括很多属性,比如timeout * 但是在定义一个Service时,如果在注解上没有配置timeout,那么就会其他地方获取timeout的配置 * 比如可以从系统变量->配置中心应用配置->配置中心全局配置->注解或xml中定义->dubbo.properties文件 * refresh是刷新,将当前ServiceConfig上的set方法所对应的属性更新为优先级最高的值 */ public void refresh() { try { /** * 1.这里确定的配置优先级从高到低是这样的 * 系统环境变量【JVM环境变量->操作系统环境变量】->配置中心应用配置->配置中心全局配置->dubbo.properties文件 * 2.调用的是Environment#getConfiguration() */ CompositeConfiguration compositeConfiguration = Environment.getInstance().getConfiguration(getPrefix(), getId()); // 表示XxConfig对象本身- AbstractConfig Configuration config = new ConfigConfigurationAdapter(this); // ServiceConfig if (Environment.getInstance().isConfigCenterFirst()) {//这个是默认的 // 优先级顺序: SystemConfiguration -> AppExternalConfiguration -> ExternalConfiguration -> AbstractConfig -> PropertiesConfiguration compositeConfiguration.addConfiguration(4, config); } else { // The sequence would be: SystemConfiguration -> AbstractConfig -> AppExternalConfiguration -> ExternalConfiguration -> PropertiesConfiguration compositeConfiguration.addConfiguration(2, config); } // loop methods, get override value and set the new value back to method Method[] methods = getClass().getMethods(); //ServiceBean for (Method method : methods) { // 是不是setXX()方法 if (MethodUtils.isSetter(method)) { // 获取xx配置项的value String value = StringUtils.trim(compositeConfiguration.getString(extractPropertyName(getClass(), method))); // isTypeMatch() is called to avoid duplicate and incorrect update, for example, we have two 'setGeneric' methods in ReferenceConfig. if (StringUtils.isNotEmpty(value) && ClassUtils.isTypeMatch(method.getParameterTypes()[0], value)) { method.invoke(this, ClassUtils.convertPrimitive(method.getParameterTypes()[0], value)); } // 是不是setParameters()方法 } else if (isParametersSetter(method)) { // 获取parameter配置项的value String value = StringUtils.trim(compositeConfiguration.getString(extractPropertyName(getClass(), method))); if (StringUtils.isNotEmpty(value)) { Map map = invokeGetParameters(getClass(), this); map = map == null ? new HashMap<>() : map; map.putAll(convert(StringUtils.parseParameters(value), "")); invokeSetParameters(getClass(), this, map); } } } } catch (Exception e) { logger.error("Failed to override ", e); } } ``` ##### Environment ```java public CompositeConfiguration getConfiguration(String prefix, String id) { CompositeConfiguration compositeConfiguration = new CompositeConfiguration(); // Config center has the highest priority // JVM环境变量 compositeConfiguration.addConfiguration(this.getSystemConfig(prefix, id)); // 操作系统环境变量 compositeConfiguration.addConfiguration(this.getEnvironmentConfig(prefix, id)); // 配置中心APP配置 compositeConfiguration.addConfiguration(this.getAppExternalConfig(prefix, id)); // 配置中心Global配置 compositeConfiguration.addConfiguration(this.getExternalConfig(prefix, id)); // dubbo.properties中的配置 compositeConfiguration.addConfiguration(this.getPropertiesConfig(prefix, id)); return compositeConfiguration; } ``` #### 确定服务支持的协议 确定服务所支持的协议还是比较简单的,就是看用户配了多少个Protocol。和服务参数意义,Protocol也是可以在各个配置点进行配置的。 1. 首先在SpringBoot的application.properties文件中就可能配置了协议 2. 也可能在dubbo.properties文件中配置了协议 3. 也可能在配置中心中也配置了协议 4. 也可能通过-D的方式也配置了协议 所以在服务导出时,需要从以上几个地方获取协议,结果可能是一个协议,也可能是多个协议,从而确定出协议。 #### URL作用 1. 资源 1. 注册中心URL:zookeeper://ip+port?dynamic=true 2. 服务:dubbo://ip+port/接口名?timeout=3000 3. 服务:http://ip+port/接口名?timeout=3000 2. 方便扩展 #### 构造服务最终的URL 有了确定的协议,服务名,服务参数后,自然就可以组装成服务的URL了。 但是还有一点是非常重要的,在Dubbo中支持服务动态配置,注意,这个和配置中心不是同一概念,动态配置是可以在服务导出后动态的去修改服务配置的,而配置中心则不能达到这一的效果(这个我要在确定一下)。 动态配置,其实就是继续给服务增加了一些参数,所以在把服务的URL注册到注册中心去之前,得先按照动态配置中所添加的配置重写一下URL,也就是应用上动态配置中的参数。 只有这样作完之后得到的URL才是**真正准确**的服务提供者URL。 ### 开始服务注册相关过程 > 1. 根据服务支持的不同协议,启动不同的Server,用来接收和处理请求 > 2. 将服务URL注册到注册中心去 > 3. 因为Dubbo支持动态配置服务参数,所以服务导出时还需要绑定一个监听器Listener来监听服务的参数是否有修改,如果发现有修改,则需要重新进行导出 > > 这三个步骤都会在`ServiceConfig#export()#doExport()` 这个方法里做,流程比较复杂,就直接看代码吧 #### 公用源码 > 这个部分的源码是前面三个步骤公用的 ##### ServiceConfig ```java public synchronized void export() { //读取服务配置 checkAndUpdateSubConfigs(); // 检查服务是否需要导出,@Service里可以配置 if (!shouldExport()) { return; } // 检查是否需要延迟发布,@Service里可以配置 if (shouldDelay()) { DELAY_EXPORT_EXECUTOR.schedule(this::doExport, getDelay(), TimeUnit.MILLISECONDS); } else { // 导出服务 doExport(); } } protected synchronized void doExport() { if (unexported) { throw new IllegalStateException("The service " + interfaceClass.getName() + " has already unexported!"); } // 已经导出了,就不再导出了 if (exported) { return; } exported = true; if (StringUtils.isEmpty(path)) { path = interfaceName; } doExportUrls(); } private void doExportUrls() { // registryURL 表示一个注册中心 List registryURLs = loadRegistries(true); //配置的每一个protocol都会产生一个dubbo服务,所以这里是循环配置的协议, // 我们这里假设配置了dubbo,但是配了两个端口,这样也算两个protocol for (ProtocolConfig protocolConfig : protocols) { // pathKey = group/contextpath/path:version // 例子:myGroup/user/org.apache.dubbo.demo.DemoService:1.0.1 String pathKey = URL.buildKey(getContextPath(protocolConfig).map(p -> p + "/" + path).orElse(path), group, version); // ProviderModel中存在服务提供者访问路径,实现类,接口,以及接口中的各个方法对应的ProviderMethodModel // ProviderMethodModel表示某一个方法,方法名,所属的服务的, ProviderModel providerModel = new ProviderModel(pathKey, ref, interfaceClass); // ApplicationModel表示应用中有哪些服务提供者和引用了哪些服务 ApplicationModel.initProviderModel(pathKey, providerModel); // 重点,每一个协议都会注册一个服务 doExportUrlsFor1Protocol(protocolConfig, registryURLs); } } private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List registryURLs) { // protocolConfig表示某个协议,registryURLs表示所有的注册中心 // 如果配置的某个协议,没有配置name,那么默认为dubbo String name = protocolConfig.getName(); if (StringUtils.isEmpty(name)) { name = DUBBO; } // 这个map表示服务url的参数 Map map = new HashMap(); map.put(SIDE_KEY, PROVIDER_SIDE); appendRuntimeParameters(map); // 监控中心参数 appendParameters(map, metrics); // 应用相关参数 appendParameters(map, application); // 模块相关参数 appendParameters(map, module); // remove 'default.' prefix for configs from ProviderConfig // appendParameters(map, provider, Constants.DEFAULT_KEY); // 提供者相关参数 appendParameters(map, provider); // 协议相关参数 appendParameters(map, protocolConfig); // 服务本身相关参数 appendParameters(map, this); // 服务中某些方法参数,@Service里可以针对某些方法配置某些参数 if (CollectionUtils.isNotEmpty(methods)) { for (MethodConfig method : methods) { // 某个方法的配置参数,注意有prefix appendParameters(map, method, method.getName()); String retryKey = method.getName() + ".retry"; // 如果某个方法配置存在xx.retry=false,则改成xx.retry=0 if (map.containsKey(retryKey)) { String retryValue = map.remove(retryKey); if (Boolean.FALSE.toString().equals(retryValue)) { map.put(method.getName() + ".retries", "0"); } } List arguments = method.getArguments(); if (CollectionUtils.isNotEmpty(arguments)) { // 遍历当前方法配置中的参数配置 for (ArgumentConfig argument : arguments) { // 如果配置了type,则遍历当前接口的所有方法,然后找到方法名和当前方法名相等的方法,可能存在多个 // 如果配置了index,则看index对应位置的参数类型是否等于type,如果相等,则向map中存入argument对象中的参数 // 如果没有配置index,那么则遍历方法所有的参数类型,等于type则向map中存入argument对象中的参数 // 如果没有配置type,但配置了index,则把对应位置的argument放入map // convert argument type if (argument.getType() != null && argument.getType().length() > 0) { Method[] methods = interfaceClass.getMethods(); // visit all methods if (methods != null && methods.length > 0) { for (int i = 0; i < methods.length; i++) { String methodName = methods[i].getName(); // target the method, and get its signature if (methodName.equals(method.getName())) { Class[] argtypes = methods[i].getParameterTypes(); // one callback in the method if (argument.getIndex() != -1) { if (argtypes[argument.getIndex()].getName().equals(argument.getType())) { appendParameters(map, argument, method.getName() + "." + argument.getIndex()); } else { throw new IllegalArgumentException("Argument config error : the index attribute and type attribute not match :index :" + argument.getIndex() + ", type:" + argument.getType()); } } else { // multiple callbacks in the method for (int j = 0; j < argtypes.length; j++) { Class argclazz = argtypes[j]; if (argclazz.getName().equals(argument.getType())) { appendParameters(map, argument, method.getName() + "." + j); if (argument.getIndex() != -1 && argument.getIndex() != j) { throw new IllegalArgumentException("Argument config error : the index attribute and type attribute not match :index :" + argument.getIndex() + ", type:" + argument.getType()); } } } } } } } } else if (argument.getIndex() != -1) { appendParameters(map, argument, method.getName() + "." + argument.getIndex()); } else { throw new IllegalArgumentException("Argument config must set index or type attribute.eg: or "); } } } } // end of methods for } if (ProtocolUtils.isGeneric(generic)) { map.put(GENERIC_KEY, generic); map.put(METHODS_KEY, ANY_VALUE); } else { String revision = Version.getVersion(interfaceClass, version); if (revision != null && revision.length() > 0) { map.put(REVISION_KEY, revision); } // 通过接口对应的Wrapper,拿到接口中所有的方法名字 String[] methods = Wrapper.getWrapper(interfaceClass).getMethodNames(); if (methods.length == 0) { logger.warn("No method found in service interface " + interfaceClass.getName()); map.put(METHODS_KEY, ANY_VALUE); } else { map.put(METHODS_KEY, StringUtils.join(new HashSet(Arrays.asList(methods)), ",")); } } // Token是为了防止服务被消费者直接调用(伪造http请求),可以在@Service里配置 // 这里防止的是某些消费者不是从注册中心拿到的URL调用提供者,而是消费者自己拼出的URL进行调用 // 服务调用的是会有个Tokenfilter过滤器进行拦截(后面讲) if (!ConfigUtils.isEmpty(token)) { if (ConfigUtils.isDefault(token)) { map.put(TOKEN_KEY, UUID.randomUUID().toString()); } else { map.put(TOKEN_KEY, token); } } // export service // 通过该host和port访问该服务 String host = this.findConfigedHosts(protocolConfig, registryURLs, map); Integer port = this.findConfigedPorts(protocolConfig, name, map); // 服务url URL url = new URL(name, host, port, getContextPath(protocolConfig).map(p -> p + "/" + path).orElse(path), map); /** * url:http://192.168.40.17:80/org.apache.dubbo.demo.DemoService?anyhost=true&application= * dubbo-demo-annotation-provider&bean.name=ServiceBean:org.apache.dubbo.demo.DemoService * &bind.ip=192.168.40.17&bind.port=80&deprecated=false&dubbo=2.0.2&dynamic=true& * generic=false&interface=org.apache.dubbo.demo.DemoService&methods=sayHello&pid=285072 * &release=&side=provider×tamp=1585206500409 * * 1.可以通过ConfiguratorFactory,对服务url再次进行配置 * 2.意思就是可以自己实现一个ConfiguratorFactory的实现类,实现对应方法对URL进行自定义修改 * 3.这个实现类是通过SPI进行加载的 */ if (ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class) .hasExtension(url.getProtocol())) { url = ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class) .getExtension(url.getProtocol()).getConfigurator(url).configure(url); } String scope = url.getParameter(SCOPE_KEY); // scope可能为null,remote, local,none // don't export when none is configured if (!SCOPE_NONE.equalsIgnoreCase(scope)) { // 如果scope为none,则不会进行任何的服务导出,既不会远程,也不会本地 // export to local if the config is not remote (export to remote only when config is remote) if (!SCOPE_REMOTE.equalsIgnoreCase(scope)) { // 如果scope不是remote,则会进行本地导出,会把当前url的protocol改为injvm,然后进行导出 // 这样的话就只有本地的JVM才能调用 exportLocal(url); } // export to remote if the config is not local (export to local only when config is local) if (!SCOPE_LOCAL.equalsIgnoreCase(scope)) { // 如果scope不是local,则会进行远程导出 if (CollectionUtils.isNotEmpty(registryURLs)) { // 如果有注册中心,则将服务注册到注册中心 for (URL registryURL : registryURLs) { //if protocol is only injvm ,not register // 如果是injvm,则不需要进行注册中心注册 if (LOCAL_PROTOCOL.equalsIgnoreCase(url.getProtocol())) { continue; } // 该服务是否是动态,对应zookeeper上表示是否是临时节点,对应dubbo中的功能就是静态服务 url = url.addParameterIfAbsent(DYNAMIC_KEY, registryURL.getParameter(DYNAMIC_KEY)); // 拿到监控中心地址 URL monitorUrl = loadMonitor(registryURL); // 当前服务连接哪个监控中心 if (monitorUrl != null) { url = url.addParameterAndEncoded(MONITOR_KEY, monitorUrl.toFullString()); } // 服务的register参数,如果为true,则表示要注册到注册中心 if (logger.isInfoEnabled()) { if (url.getParameter(REGISTER_KEY, true)) { logger.info("Register dubbo service " + interfaceClass.getName() + " url " + url + " to registry " + registryURL); } else { logger.info("Export dubbo service " + interfaceClass.getName() + " to url " + url); } } // For providers, this is used to enable custom proxy to generate invoker // 服务使用的动态代理机制,如果为空则使用javassit String proxy = url.getParameter(PROXY_KEY); if (StringUtils.isNotEmpty(proxy)) { registryURL = registryURL.addParameter(PROXY_KEY, proxy); } /** * 1.生成一个当前服务接口的代理对象 * 2.使用代理生成一个Invoker,Invoker表示服务提供者的代理,可以使用Invoker的invoke方法执行服务 * 就是把注册中心的URL和服务的URL拼起来,registryURL + "export" + url,对应的url为: * registry://127.0.0.1:2181/org.apache.dubbo.registry.RegistryService? application=dubbo-demo-annotation-provider&dubbo=2.0.2&export= http://192.168.40.17:80/org.apache.dubbo.demo.DemoService? anyhost=true&application=dubbo-demo-annotation-provider&bean.name= ServiceBean:org.apache.dubbo.demo.DemoService&bind.ip=192.168.40.17& bind.port=80&deprecated=false&dubbo=2.0.2&dynamic=true&generic= false&interface=org.apache.dubbo.demo.DemoService&methods=sayHello& pid=19472&release=&side=provider×tamp=1585207994860&pid=19472& registry=zookeeper×tamp=1585207994828 * * 3.这个Invoker中包括了服务的实现者、服务接口类、服务的注册地址(针对当前服务的,参数export 指定了当前服务) * 4.此invoker表示一个可执行的服务,调用invoker的invoke()方法即可执行服务,同时此invoker也可用来导出 * 在服务导出(注册)的时候,invoker只是存在某一个地方,等消费者调用服务的时候才会执行 * 5.ref就是之前讲过的服务具体实现类 * 6.这里第二个参数传的是URL(具体就是registryURL),后面exporter马上会用 */ Invoker invoker = PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(EXPORT_KEY, url.toFullString())); // invoker.invoke(Invocation) // DelegateProviderMetaDataInvoker也表示服务提供者,包括了Invoker和服务的配置 //把this(也就是serviceconfig服务参数)和invoker 服务实现类等 再包装一下 DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this); /** * 使用特定的协议来对服务进行导出,这里的协议为RegistryProtocol,导出成功后得到一个Exporter * 1.exporter导出器怎么确定用哪个实现类的export方法呢?SPI机制会判断哪个invoker里面有getURL这个方法 * 【这里不知道怎么调哪个类的哪个方法的请看前面讲的SPI】 * 2.因为前面invoker传的是registryURL,所以我们这里就会使用RegistryProtocol进行服务注册 * registryURL可以理解为注册中心的注册协议吧,debug这里,就会看到是这样的registry://127.0.0.1:2181...... * 3.注册完了之后,使用DubboProtocol进行导出 * 4.到此为止做了哪些事情? ServiceBean.export()-->刷新ServiceBean的参数-->得到注册中心URL和协议URL--> * 遍历每个协议URL-->组成服务URL-->生成可执行服务Invoker-->导出服务 * 5.这里就会调用RegistryProtocol#export(Invoker) */ Exporter exporter = protocol.export(wrapperInvoker); exporters.add(exporter); } } else { // 没有配置注册中心时,也会导出服务 if (logger.isInfoEnabled()) { logger.info("Export dubbo service " + interfaceClass.getName() + " to url " + url); } Invoker invoker = PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass, url); DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this); Exporter exporter = protocol.export(wrapperInvoker); exporters.add(exporter); } /** * @since 2.7.0 * ServiceData Store */ // 根据服务url,讲服务的元信息存入元数据中心 MetadataReportService metadataReportService = null; if ((metadataReportService = getMetadataReportService()) != null) { metadataReportService.publishProvider(url); } } } this.urls.add(url); } ``` #### 启动Netty,Tomcat等Server源码 ##### RegistryProtocol ```java @SuppressWarnings("unchecked") private ExporterChangeableWrapper doLocalExport(final Invoker originInvoker, URL providerUrl) { String key = getCacheKey(originInvoker); return (ExporterChangeableWrapper) bounds.computeIfAbsent(key, s -> { Invoker invokerDelegate = new InvokerDelegate<>(originInvoker, providerUrl); /** * 1.这里又是SPI的知识。protocol属性的值是哪来的,是在SPI中注入进来的,是一个代理类 * 2.InvokerDelegate的父类InvokerWrapper有getURL方法,所以最终SPI决定调哪个扩展点 * 是通过providerUrl决定的,而providerUrl这里基本就是DubboProtocol或HttpProtocol去export * 3.我们这里用的是dubbo协议,所以会调用DubboProtocol * 4.为什么需要ExporterChangeableWrapper?方便注销已经被导出的服务 */ return new ExporterChangeableWrapper<>((Exporter) protocol.export(invokerDelegate), originInvoker); }); } ``` ##### DubboProtocol ```java public Exporter export(Invoker invoker) throws RpcException { URL url = invoker.getUrl(); // 唯一标识一个服务的key String key = serviceKey(url); // 构造一个Exporter进行服务导出 DubboExporter exporter = new DubboExporter(invoker, key, exporterMap); exporterMap.put(key, exporter); //export an stub service for dispatching event Boolean isStubSupportEvent = url.getParameter(STUB_EVENT_KEY, DEFAULT_STUB_EVENT); Boolean isCallbackservice = url.getParameter(IS_CALLBACK_SERVICE, false); if (isStubSupportEvent && !isCallbackservice) { String stubServiceMethods = url.getParameter(STUB_EVENT_METHODS_KEY); if (stubServiceMethods == null || stubServiceMethods.length() == 0) { if (logger.isWarnEnabled()) { logger.warn(new IllegalStateException("consumer [" + url.getParameter(INTERFACE_KEY) + "], has set stubproxy support event ,but no stub methods founded.")); } } else { // 服务的stub方法 stubServiceMethodsMap.put(url.getServiceKey(), stubServiceMethods); } } // 开启NettyServer // 请求--->invocation--->服务key--->exporterMap.get(key)--->exporter--->invoker--->invoker.invoke(invocation)-->执行服务 openServer(url); // 特殊的一些序列化机制,比如kryo提供了注册机制来注册类,提高序列化和反序列化的速度 optimizeSerialization(url); return exporter; } private void openServer(URL url) { // find server. String key = url.getAddress(); // 获得ip地址和port, 192.168.40.17:20880 // NettyClient, NettyServer //client can export a service which's only for server to invoke boolean isServer = url.getParameter(IS_SERVER_KEY, true); if (isServer) { // 缓存Server对象 ExchangeServer server = serverMap.get(key); // DCL,Double Check Lock if (server == null) { synchronized (this) { server = serverMap.get(key); if (server == null) { // 创建Server,并进行缓存 serverMap.put(key, createServer(url)); } } } else { // server supports reset, use together with override // 服务重新导出时,就会走这里 这里会调用HeaderExchangeServer#reset server.reset(url); } } } ``` ##### AbstractProtocol ```java protected static String serviceKey(URL url) { int port = url.getParameter(Constants.BIND_PORT_KEY, url.getPort()); // path就是@Service注解配的,path配了就用,不配就不用。这四个参数作用就是生成唯一标识一个服务的key // 从这里就可以看出,协议相同的只要端口号不一样,依然算不同的服务 return serviceKey(port, url.getPath(), url.getParameter(VERSION_KEY), url.getParameter(GROUP_KEY)); } ``` ##### 启动Server总结 > 这里不是dubbo的核心,就不贴源码了 在服务URL中指定了协议,比如Http协议、Dubbo协议。根据不同的协议启动对应的Server。 比如Http协议就启动Tomcat、Jetty。 比如Dubbo协议就启动Netty。 不能只启动Server,还需要绑定一个RequestHandler,用来处理请求。 比如,Http协议对应的就是InternalHandler。Dubbo协议对应的就是ExchangeHandler。 这里来详细分析一下Dubbo协议所启动的Server。 1. 调用DubboProtocol的openServer(URL url)方法开启启动Server 2. 调用DubboProtocol的createServer(url)方法,在createServer()方法中调用**Exchangers.bind(url, requestHandler)**得到一个ExchangeServer 3. 其中requestHandler表示请求处理器,用来处理请求 4. 在**Exchangers.bind(url, requestHandler)**中,先会根据URL得到一个Exchanger,默认为HeaderExchanger 5. HeaderExchanger中包括HeaderExchangeClient、HeaderExchangeServer 6. HeaderExchangeClient负责发送心跳,HeaderExchangeServer负责接收心跳,如果超时则会关闭channel 7. 在构造HeaderExchangeServer之前,会通过调用Transporters._bind_(url, **new** DecodeHandler(**new** HeaderExchangeHandler(handler)))方法的到一个Server 8. 默认会使用getTransporter去bind(URL url, ChannelHandler listener)从而得到一个Servlet,此时的listener就是外部传进来的DecodeHandler 9. 在NettyTransporter的bind方法中会去**new** NettyServer(url, listener),所以上面返回的Server默认就是NettyServer 10. 在构造NettyServer时,会调用ChannelHandlers.wrap(handler, ExecutorUtil.setThreadName(url, SERVER\_THREAD\_POOL_NAME))再构造一个ChannelHandler。 11. wrap中的handler就是上面的listener 12. 在wrap方法中会调用new MultiMessageHandler(new HeartbeatHandler(ExtensionLoader.getExtensionLoader(Dispatcher.class).getAdaptiveExtension().dispatch(handler, url)));构造一个ChannelHandler。 13. 构造完ChannelHandler后,就是真正的去开启Server了,会调用AbstractServer抽象类的doOpen方法。 14. 在NettyServer中,会实现doOpen方法,会调用**new** NettyServerHandler(getUrl(), **this**)构造一个NettyServerHandler,并bind地址 15. 至此,DubboProtocol协议的启动Server流程就结束。 总结一下DubboProtocol协议的RequestHandler链路: 1. NettyServerHandler:与NettyServer直接绑定的请求处理器,负责从Netty接收到请求,channelRead()方法获取到请求,然后调用下一层的Handler(NettyServer)的received()方法将请求传递下去,此时的请求还是Object msg 2. NettyServer:NettyServer的父类AbstractPeer中存在received(),该方法没有做什么,直接把msg传递给下一层Handler(MultiMessageHandler) 3. MultiMessageHandler:此Handler会判断msg是否是一个MultiMessage,如果是,则对MultiMessage进行拆分,则把拆分出来的msg传递给下层Handler(HeartbeatHandler),如果不是,则直接把msg传递给下层Handler(HeartbeatHandler) 4. HeartbeatHandler:此Handler通过received()方法接收到msg,然后判断该msg是不是一个心跳请求或心跳响应,如果是心跳请求,则此Handler返回一个Response对象(很简单的一个对象),如果是心跳响应,则打印一个日志,不会有其他逻辑,如果都不是,则把msg传递给下层Handler(AllChannelHandler)。 5. AllChannelHandler:此Handler通过received()方法接收到msg,然后把msg封装为一个ChannelEventRunnable对象,并把ChannelEventRunnable扔到线程池中去,异步去处理该msg。在ChannelEventRunnable中会把msg交给下一个Handler(DecodeHandler) 6. DecodeHandler:此Handler通过received()方法接收到msg,会对msg解析decode解码,然后交给下一个Handler(HeaderExchangeHandler) 7. HeaderExchangeHandler:此Handler通过received()方法接收到msg,会判断msg的类型 1. 如果Request是TwoWay,则会调用下一个Handler(DubboProtocol中的**requestHandler**)的reply方法得到一个结果,然后返回 2. 如果Request不是TwoWay,则会调用下一个Handler(DubboProtocol中的**requestHandler**)的received方法处理该请求,不会返回结果 8. requestHandler:此Handler是真正的处理请求逻辑,在received()方法中,如果msg是Invocation,则会调用reply方法,但不会返回reply方法所返回的结果,在reply方法中把msg强制转换为Invocation类型 inv,然后根据inv得到对应的服务Invoker,然后调用invoke(inv)方法,得到结果。 #### 服务注册源码 ##### RegistryProtocol ```java public void register(URL registryUrl, URL registeredProviderUrl) { // 这里最终也是通过SPI机制,判断传过来的是什么,我们这里在前面把registry转成了zookeeper的URL Registry registry = registryFactory.getRegistry(registryUrl); // 所以这里会调用ZookeeperRegistry的register方法,实际上是先调用ZookeeperRegistry的父类FailbackRegistry registry.register(registeredProviderUrl); } ``` ##### FailbackRegistry.java ```java public void register(URL url) { super.register(url); removeFailedRegistered(url); removeFailedUnregistered(url); try { // 然后在这里调用ZookeeperRegistry#doRegister,这个URL参数很明显又是一个SPI的提现 doRegister(url); } catch (Exception e) { Throwable t = e; // If the startup detection is opened, the Exception is thrown directly. boolean check = getUrl().getParameter(Constants.CHECK_KEY, true) && url.getParameter(Constants.CHECK_KEY, true) && !CONSUMER_PROTOCOL.equals(url.getProtocol()); boolean skipFailback = t instanceof SkipFailbackWrapperException; if (check || skipFailback) { if (skipFailback) { t = t.getCause(); } throw new IllegalStateException("Failed to register " + url + " to registry " + getUrl().getAddress() + ", cause: " + t.getMessage(), t); } else { logger.error("Failed to register " + url + ", waiting for retry, cause: " + t.getMessage(), t); } // Record a failed registration request to a failed list, retry regularly addFailedRegistered(url); } } ``` ##### ZookeeperRegistry ```java @Override public void doRegister(URL url) { try { zkClient.create(toUrlPath(url), url.getParameter(DYNAMIC_KEY, true)); } catch (Throwable e) { throw new RpcException("Failed to register " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e); } } ``` ### 服务监听器原理 > 动态配置不能改端口 #### 服务监听器原理总结 1. 服务在导出的过程中需要向动态配置中心的数据进行订阅,以便当管理人员修改了动态配置中心中对应服务的参数后,服务提供者能及时做出变化。此功能涉及到版本兼容,因为在Dubbo2.7之前也存在此功能,Dubbo2.7开始对此功能进行了调整。 2. 在Dubbo2.7之前,仅支持多某个服务的动态配置 3. 在Dubbo2.7之后,不仅支持对单个服务的动态配置,也支持对某个应用的动态配置(相当于对这个应用下的所有服务生效) 4. 为了达到这个功能,需要利用Zookeeper的Watcher机制,所以对于服务提供者而言,我到底监听哪个Zookeeper节点的数据变化呢? 5. 这个节点是有规则的,并且在Dubbo2.7前后也不一样: 1. Dubbo2.7之前:监听的zk路径是:`/dubbo/org.apache.dubbo.demo.DemoService/configurators/override://0.0.0.0/org.apache.dubbo.demo.DemoService?category=configurators&compatible_config=true&dynamic=false&enabled=true&timeout=6000`注意,注意监听的是节点名字的变化,而不是节点内容 2. Dubbo2.7之后,监听的zk路径是: 1. 服务: `/dubbo/config/dubbo/org.apache.dubbo.demo.DemoService.configurators`节点的内容 2. 应用: `/dubbo/config/dubbo/dubbo-demo-provider-application.configurators`节点的内容 6. 注意,要和配置中心的路径区分开来,配置中心的路径是: 1. 应用:/dubbo/config/dubbo/org.apache.dubbo.demo.DemoService/dubbo.properties节点的内容 2. 全局:/dubbo/config/dubbo/dubbo.properties节点的内容 > 所以在一个服务进行导出时,需要在服务提供者端给当前服务生成一个对应的监听器实例,这个监听器实例为OverrideListener,它负责监听对应服务的动态配置变化,并且根据动态配置中心的参数重写服务URL。 除开有OverrideListener之外,在Dubbo2.7之后增加了另外两个: 1. ProviderConfigurationListener:监听的是应用的动态配置数据修改,所以它是在RegistryProtocol类中的一个属性,并且是随着RegistryProtocol实例化而实例化好的,一个应用中只有一个 2. ServiceConfigurationListener:监听的是服务的动态配置数据修改,和OverrideListener类似,也是对应一个服务的,所以在每个服务进行导出时都会生成一个,实际上ServiceConfigurationListener的内部有一个属性就是OverrideListener,所以当ServiceConfigurationListener监听数据发生了变化时,就会把配置中心的最新数据交给OverrideListener去重写服务URL。 3. 同时在RegistryProtocol类中保存了所有服务所对应的OverrideListener,所以实际上当ProviderConfigurationListener监听到数据发生了变化时,也会把它所得到的最新数据依次调用每个OverrideListener去重写服务对应的服务URL。 4. ProviderConfigurationListener会监听/dubbo/config/dubbo/dubbo-demo-provider-application.configurators节点 5. ServiceConfigurationListener会监听/dubbo/config/dubbo/org.apache.dubbo.demo.DemoService.configurators节点 **整理修改动态配置触发流程:** 1. 修改服务动态配置,底层会修改Zookeeper中的数据, 1. /dubbo/config/dubbo/org.apache.dubbo.demo.DemoService.configurators节点的内容 2. ServiceConfigurationListener会监听到节点内容的变化,会触发ServiceConfigurationListener的父类AbstractConfiguratorListener的process(ConfigChangeEvent event)方法 3. ConfigChangeEvent表示一个事件,事件中有事件类型,还有事件内容(节点内容),还有触发这个事件的节点名字,事件类型有三个: 1. ADDED 2. MODIFIED 3. DELETED 4. 当接收到一个ConfigChangeEvent事件后,会根据事件类型做对应的处理 1. ADDED、MODIFIED:会根据节点内容去生成override://协议的URL,然后根据URL去生成Configurator, Configurator对象很重要,表示一个配置器,根据配置器可以去重写URL 2. DELETED:删除ServiceConfigurationListener内的所有的Configurator 5. 生成了Configurator后,调用notifyOverrides()方法对服务URL进行重写 6. 注意,每次重写并不仅仅只是用到上面所生成的Configurator,每次重写要用到所有的Configurator,包括本服务的Configurator,也包括本应用的Configurator,也包括老版本管理台的Configurator,重写URL的逻辑如下: 1. 从exporter中获取目前已经导出了的服务URL-currentUrl 2. 根据老版本管理台的Configurator重写服务URL 3. 根据providerConfigurationListener中的Configurator重写服务URL 4. 根据serviceConfigurationListeners中对应的服务的Configurator重写服务URL 5. 如果重写之后newUrl和currentUrl相等,那么不需要做什么了 6. 如果重写之后newUrl和currentUrl不相等,则需要进行**服务重新导出**: 1. 根据newUrl进行导出,注意,这里只是就是调用DubboProtocol的export,再次去启动NettyServer 2. 对newUrl进行简化,简化为registeredProviderUrl 3. 调用RegistryProtocol的unregister()方法,把当前服务之前的服务提供URL从注册中心删掉 4. 调用RegistryProtocol的register()方法,把新的registeredProviderUrl注册到注册中心 #### 服务监听器绑定源码 ##### RegistryProtocol ```java @Override public Exporter export(final Invoker originInvoker) throws RpcException { // 导出服务 // registry:// ---> RegistryProtocol // zookeeper:// ---> ZookeeperRegistry // dubbo:// ---> DubboProtocol /** * 1.registry://xxx?xx=xx®istry=zookeeper ---> zookeeper://xxx?xx=xx 表示注册中心 * 这里就是把registry替换成zookeeper * 2.示例:zookeeper://127.0.0.1:2181/org.apache.dubbo.registry.RegistryService?application= * dubbo-demo-provider-application&dubbo=2.0.2&export=dubbo://192.168.40.17:20880/ * org.apache.dubbo.demo.DemoService?anyhost=true&application= * dubbo-demo-provider-application&bean.name=ServiceBean:org.apache.dubbo.demo.DemoService * &bind.ip=192.168.40.17&bind.port=20880&deprecated=false&dubbo=2.0.2& * dynamic=true&generic=false&interface=org.apache.dubbo.demo.DemoService& * logger=log4j&methods=sayHello&pid=27656&release=2.7.0&side=provider&timeout=3000& * timestamp=1590735956489&logger=log4j&pid=27656&release=2.7.0×tamp=1590735956479 */ URL registryUrl = getRegistryUrl(originInvoker); // 得到服务提供者url,表示服务提供者 /** * 1.这里就是把之前export后面拼的dubbo服务url拿出来 * 2.示例:dubbo://192.168.40.17:20880/org.apache.dubbo.demo.DemoService?anyhost=true&application= * dubbo-demo-provider-application&bean.name=ServiceBean:org.apache.dubbo.demo.DemoService& * bind.ip=192.168.40.17&bind.port=20880&deprecated=false&dubbo=2.0.2&dynamic=true& * generic=false&interface=org.apache.dubbo.demo.DemoService&logger=log4j&methods=sayHello * &pid=27656&release=2.7.0&side=provider&timeout=3000×tamp=1590735956489 * 3.服务导出最终的目的就是要把providerUrl存到注册中心上,只不过中间有一些其他操作 */ URL providerUrl = getProviderUrl(originInvoker); // Subscribe the override data // FIXME When the provider subscribes, it will affect the scene : a certain JVM exposes the service and call // the same service. Because the subscribed is cached key with the name of the service, it causes the // subscription information to cover. // overrideSubscribeUrl是老版本的动态配置监听url,表示了需要监听的服务以及监听的类型(configurators, 这是老版本上的动态配置) // 在服务提供者url的基础上,生成一个overrideSubscribeUrl,协议为provider://,增加参数category=configurators&check=false final URL overrideSubscribeUrl = getSubscribedOverrideUrl(providerUrl); // 一个overrideSubscribeUrl对应一个OverrideListener,用来监听变化事件,监听到overrideSubscribeUrl的变化后, // OverrideListener就会根据变化进行相应处理,具体处理逻辑看OverrideListener的实现 final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker); overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener); /** * 在这个方法里会利用providerConfigurationListener和serviceConfigurationListener去重写providerUrl * providerConfigurationListener表示应用级别的动态配置监听器,providerConfigurationListener是RegistyProtocol的一个属性 * serviceConfigurationListener表示服务级别的动态配置监听器,serviceConfigurationListener是在每暴露一个服务时就会生成一个 * 这两个监听器都是新版本中的监听器 * 新版本监听的zk路径是: * 服务: /dubbo/config/dubbo/org.apache.dubbo.demo.DemoService.configurators节点的内容 * 应用: /dubbo/config/dubbo/dubbo-demo-provider-application.configurators节点的内容 * 注意,要和配置中心的路径区分开来,配置中心的路径是: * 应用:/dubbo/config/dubbo/org.apache.dubbo.demo.DemoService/dubbo.properties节点的内容 * 全局:/dubbo/config/dubbo/dubbo.properties节点的内容 */ providerUrl = overrideUrlWithConfig(providerUrl, overrideSubscribeListener); // export invoker // 根据动态配置重写了providerUrl之后,就会调用DubboProtocol或HttpProtocol去进行导出服务了, // 这里会启动netty,启动tomcat这些 final ExporterChangeableWrapper exporter = doLocalExport(originInvoker, providerUrl); // url to registry // 得到注册中心-ZookeeperRegistry final Registry registry = getRegistry(originInvoker); // 得到存入到注册中心去的providerUrl,会对服务提供者url中的参数进行简化, // 因为有些参数存到注册中心是没有用的 final URL registeredProviderUrl = getRegisteredProviderUrl(providerUrl, registryUrl); // 将当前服务提供者Invoker,以及该服务对应的注册中心地址,以及简化后的服务url存入ProviderConsumerRegTable ProviderInvokerWrapper providerInvokerWrapper = ProviderConsumerRegTable.registerProvider(originInvoker, registryUrl, registeredProviderUrl); //to judge if we need to delay publish //是否需要注册到注册中心 boolean register = providerUrl.getParameter(REGISTER_KEY, true); if (register) { // 注册服务,把简化后的服务提供者url注册到registryUrl中去 register(registryUrl, registeredProviderUrl); providerInvokerWrapper.setReg(true); } /** * 针对老版本的动态配置,需要把overrideSubscribeListener绑定到overrideSubscribeUrl上去进行监听 * 兼容老版本的配置修改,利用overrideSubscribeListener去监听旧版本的动态配置变化 * 监听overrideSubscribeUrl provider://192.168.40.17:20880/org.apache.dubbo.demo.DemoService?anyhost=true& * application=dubbo-demo-annotation-provider&bean.name=ServiceBean:org.apache.dubbo.demo.DemoService& * bind.ip=192.168.40.17&bind.port=20880&category=configurators&check=false&deprecated=false&dubbo=2.0.2& * dynamic=true&generic=false&interface=org.apache.dubbo.demo.DemoService&methods=sayHello&pid=416332& * release=&side=provider×tamp=1585318241955 * 那么新版本的providerConfigurationListener和serviceConfigurationListener是在什么时候进行订阅的呢?在这两个类构造的时候 * Deprecated! Subscribe to override rules in 2.6.x or before. * 老版本监听的zk路径是:/dubbo/org.apache.dubbo.demo.DemoService/configurators/override://0.0.0.0/org.apache.dubbo.demo.DemoService?category=configurators&compatible_config=true&dynamic=false&enabled=true&timeout=6000 * 监听的是路径的内容,不是节点的内容 */ registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener); exporter.setRegisterUrl(registeredProviderUrl); exporter.setSubscribeUrl(overrideSubscribeUrl); //Ensure that a new exporter instance is returned every time export return new DestroyableExporter<>(exporter); } private URL overrideUrlWithConfig(URL providerUrl, OverrideListener listener) { /** * 1.应用配置,providerConfigurationListener是在属性那里直接初始化好的, * providerConfigurationListener会监听配置中心的应用配置信息变动 * 这个是每一个应用只有一个providerConfigurationListener * 2.首先这里流程是: * 1.ProviderConfigurationListener通过构造函数调用父类AbstractConfiguratorListener * #initWith方法 * 2.在initWith方法中通过传进来的路径key,监听注册中心(常用的是zookeeper) * key路径下的节点,会先从注册中心拿到当前配置然后转换成configurators * 3.接着这里调用overrideUrl,用前面的configurators生成新的providerUrl * 4.这里因为之前的providerUrl是经过@Service注解,配置中心文件(yml或properties) * 还有-D这种启动参数里的配置,组合成的一个URL。但是这个providerUrl还没有经过 * 网页端的动态配置,所以这里需要重写下URL * 5.ServiceConfigurationListener同理,而且ServiceConfigurationListener代码顺序在后面 * 所以很明显'服务配置'会覆盖'应用配置' */ providerUrl = providerConfigurationListener.overrideUrl(providerUrl); // 服务配置,new ServiceConfigurationListener的时候回初始化,ServiceConfigurationListener会监听配置中心的服务信息配置信息变动 // 这个是每个服务都会重新new一个ServiceConfigurationListener ServiceConfigurationListener serviceConfigurationListener = new ServiceConfigurationListener(providerUrl, listener); serviceConfigurationListeners.put(providerUrl.getServiceKey(), serviceConfigurationListener); return serviceConfigurationListener.overrideUrl(providerUrl); } //RegistryProtocol内部类 public ProviderConfigurationListener() { //订阅 应用名+".configurators" 这里就是新版本ProviderConfigurationListener的监听路径 this.initWith(ApplicationModel.getApplication() + CONFIGURATORS_SUFFIX); } ``` ##### AbstractConfiguratorListener ```java // 在构造ProviderConfigurationListener和ServiceConfigurationListener都会调用到这个方法 // 完成Listener自身订阅到对应的应用和服务 // 订阅关系绑定完了之后,主动从动态配置中心获取一下对应的配置数据生成configurators,后面需要重写providerUrl protected final void initWith(String key) { //这里拿到的就是注册中心,我们大部分情况用的是zookeeper DynamicConfiguration dynamicConfiguration = DynamicConfiguration.getDynamicConfiguration(); // 添加Listener,进行了订阅 dynamicConfiguration.addListener(key, this); // 从配置中心ConfigCenter获取属于当前应用的动态配置数据,从zk中拿到原始数据(主动从配置中心获取数据) String rawConfig = dynamicConfiguration.getRule(key, DynamicConfiguration.DEFAULT_GROUP); // 如果存在应用配置信息则根据配置信息生成Configurator if (!StringUtils.isEmpty(rawConfig)) { genConfiguratorsFromRawRule(rawConfig); } } private boolean genConfiguratorsFromRawRule(String rawConfig) { boolean parseSuccess = true; try { // parseConfigurators will recognize app/service config automatically. // 先把应用或服务配置转成url,再根据url生成对应的Configurator configurators = Configurator.toConfigurators(ConfigParser.parseConfigurators(rawConfig)) .orElse(configurators); } catch (Exception e) { logger.error("Failed to parse raw dynamic config and it will not take effect, the raw config is: " + rawConfig, e); parseSuccess = false; } return parseSuccess; } ``` #### 服务监听器监听源码 ##### RegistryProtocol ```java private class ProviderConfigurationListener extends AbstractConfiguratorListener { public ProviderConfigurationListener() { //订阅 应用名+".configurators" 这里就是新版本ProviderConfigurationListener的监听路径 this.initWith(ApplicationModel.getApplication() + CONFIGURATORS_SUFFIX); } /** * Get existing configuration rule and override provider url before exporting. * * @param providerUrl * @param * @return */ private URL overrideUrl(URL providerUrl) { // 通过configurators去修改/装配providerUrl return RegistryProtocol.getConfigedInvokerUrl(configurators, providerUrl); } @Override protected void notifyOverrides() { overrideListeners.values().forEach(listener -> ((OverrideListener) listener).doOverrideIfNecessary()); } } private class ServiceConfigurationListener extends AbstractConfiguratorListener { private URL providerUrl; private OverrideListener notifyListener; public ServiceConfigurationListener(URL providerUrl, OverrideListener notifyListener) { this.providerUrl = providerUrl; this.notifyListener = notifyListener; // 订阅 服务接口名+group+version+".configurators" this.initWith(DynamicConfiguration.getRuleKey(providerUrl) + CONFIGURATORS_SUFFIX); } private URL overrideUrl(URL providerUrl) { return RegistryProtocol.getConfigedInvokerUrl(configurators, providerUrl); } //这里是监听入口 @Override protected void notifyOverrides() { notifyListener.doOverrideIfNecessary(); } } public synchronized void doOverrideIfNecessary() { final Invoker invoker; if (originInvoker instanceof InvokerDelegate) { invoker = ((InvokerDelegate) originInvoker).getInvoker(); } else { invoker = originInvoker; } //The origin invoker 当前服务的原始服务提供者url,没有经过任何动态配置改变的URL URL originUrl = RegistryProtocol.this.getProviderUrl(invoker); String key = getCacheKey(originInvoker); ExporterChangeableWrapper exporter = bounds.get(key); if (exporter == null) { logger.warn(new IllegalStateException("error state, exporter should not be null")); return; } //The current, may have been merged many times,事件触发之前,当前服务被导出的url URL currentUrl = exporter.getInvoker().getUrl(); //根据configurators修改url,configurators是全量的,并不是某个新增的或删除的, // 所以是基于原始的url进行修改,并不是基于currentUrl,这里是老版本的configurators //Merged with this configuration URL newUrl = getConfigedInvokerUrl(configurators, originUrl); // 这是新版本的configurators newUrl = getConfigedInvokerUrl(providerConfigurationListener.getConfigurators(), newUrl); newUrl = getConfigedInvokerUrl(serviceConfigurationListeners.get(originUrl.getServiceKey()) .getConfigurators(), newUrl); // 修改过的url如果和目前的url不相同,则重新按newUrl导出 if (!currentUrl.equals(newUrl)) { RegistryProtocol.this.reExport(originInvoker, newUrl); logger.info("exported provider url changed, origin url: " + originUrl + ", old export url: " + currentUrl + ", new export url: " + newUrl); } } public void reExport(final Invoker originInvoker, URL newInvokerUrl) { // 根据newInvokerUrl进行导出 // update local exporter ExporterChangeableWrapper exporter = doChangeLocalExport(originInvoker, newInvokerUrl); // 获取准确的ProviderUrl // update registry URL registryUrl = getRegistryUrl(originInvoker); // 对于一个服务提供者url,在注册到注册中心时,会先进行简化 final URL registeredProviderUrl = getRegisteredProviderUrl(newInvokerUrl, registryUrl); //decide if we need to re-publish // 根据getServiceKey获取ProviderInvokerWrapper ProviderInvokerWrapper providerInvokerWrapper = ProviderConsumerRegTable.getProviderWrapper(registeredProviderUrl, originInvoker); // 生成一个新的ProviderInvokerWrapper ProviderInvokerWrapper newProviderInvokerWrapper = ProviderConsumerRegTable.registerProvider(originInvoker, registryUrl, registeredProviderUrl); /** * Only if the new url going to Registry is different with the previous one should we do unregister and register. * 如果新的服务提供者url简化后的url和这个服务之前的服务提供者url简化后的url不相等,则需要把新的简化后的服务提供者url注册到注册中心去 */ if (providerInvokerWrapper.isReg() && !registeredProviderUrl.equals(providerInvokerWrapper.getProviderUrl())) { unregister(registryUrl, providerInvokerWrapper.getProviderUrl()); register(registryUrl, registeredProviderUrl); newProviderInvokerWrapper.setReg(true); } exporter.setRegisterUrl(registeredProviderUrl); } private ExporterChangeableWrapper doChangeLocalExport(final Invoker originInvoker, URL newInvokerUrl) { String key = getCacheKey(originInvoker); final ExporterChangeableWrapper exporter = (ExporterChangeableWrapper) bounds.get(key); if (exporter == null) { logger.warn(new IllegalStateException("error state, exporter should not be null")); } else { // 到这里才能真正明白,为什么需要InvokerDelegate // InvokerDelegate表示一个调用者,由invoker+url构成,invoker不变,url可变 final Invoker invokerDelegate = new InvokerDelegate(originInvoker, newInvokerUrl); // 这里最后又会走到DubboProtocol#export 那里的逻辑,服务重新导出前面见过了 exporter.setExporter(protocol.export(invokerDelegate)); } return exporter; } ``` Q:这里引出一个问题,配置改变之后Netty,tomcat需要重启吗? A:不需要,为什么?前面的DubboProtocol#export 那里的reset逻辑讲过 #### 服务重新导出源码 ##### DubboProtocol ```java public Exporter export(Invoker invoker) throws RpcException { // ....省略 // 开启NettyServer // 请求--->invocation--->服务key--->exporterMap.get(key)--->exporter--->invoker--->invoker.invoke(invocation)-->执行服务 openServer(url); // 特殊的一些序列化机制,比如kryo提供了注册机制来注册类,提高序列化和反序列化的速度 optimizeSerialization(url); return exporter; } private void openServer(URL url) { // find server. String key = url.getAddress(); // 获得ip地址和port, 192.168.40.17:20880 // NettyClient, NettyServer //client can export a service which's only for server to invoke boolean isServer = url.getParameter(IS_SERVER_KEY, true); if (isServer) { // 缓存Server对象 ExchangeServer server = serverMap.get(key); // DCL,Double Check Lock if (server == null) { synchronized (this) { server = serverMap.get(key); if (server == null) { // 创建Server,并进行缓存 serverMap.put(key, createServer(url)); } } } else { // server supports reset, use together with override // 服务重新导出时,就会走这里 这里会调用HeaderExchangeServer#reset server.reset(url); } } } ``` ##### HeaderExchangeServer ```java //启动netty的时候会调用这个 public HeaderExchangeServer(Server server) { Assert.notNull(server, "server == null"); this.server = server; // 启动定义关闭Channel(socket)的Task startIdleCheckTask(getUrl()); } private void startIdleCheckTask(URL url) { if (!server.canHandleIdle()) { // 底层NettyServer自己有心跳机制,那么上层的ExchangeServer就不用开启心跳任务了 AbstractTimerTask.ChannelProvider cp = () -> unmodifiableCollection(HeaderExchangeServer.this.getChannels()); int idleTimeout = getIdleTimeout(url); long idleTimeoutTick = calculateLeastDuration(idleTimeout); // 定义关闭Channel的Task CloseTimerTask closeTimerTask = new CloseTimerTask(cp, idleTimeoutTick, idleTimeout); this.closeTimerTask = closeTimerTask; // init task and start timer. // 定时运行closeTimerTask IDLE_CHECK_TIMER.newTimeout(closeTimerTask, idleTimeoutTick, TimeUnit.MILLISECONDS); } } public void reset(URL url) { server.reset(url); try { int currHeartbeat = getHeartbeat(getUrl()); int currIdleTimeout = getIdleTimeout(getUrl()); int heartbeat = getHeartbeat(url); int idleTimeout = getIdleTimeout(url); /** * 1.动态改配置,重新导出服务时不需要重新启动netty,tomcat等等 * 2.这里直接关闭那个服务的channel任务,然后根据新的url重启一个任务就行了 */ if (currHeartbeat != heartbeat || currIdleTimeout != idleTimeout) { cancelCloseTask(); startIdleCheckTask(url); } } catch (Throwable t) { logger.error(t.getMessage(), t); } } ``` ##### CloseTimerTask ```java @Override protected void doTask(Channel channel) { try { Long lastRead = lastRead(channel); Long lastWrite = lastWrite(channel); Long now = now(); // check ping & pong at server // 表示Server端有多长时间没有读到过数据或写出过数据了,说白就是超时了 if ((lastRead != null && now - lastRead > idleTimeout) || (lastWrite != null && now - lastWrite > idleTimeout)) { logger.warn("Close channel " + channel + ", because idleCheck timeout: " + idleTimeout + "ms"); channel.close(); } } catch (Throwable t) { logger.warn("Exception when close remote channel " + channel.getRemoteAddress(), t); } } ``` ### 服务导出源码流程总结 1. ServiceBean.export()方法是导出的入口方法,会执行ServiceConfig.export()方法完成服务导出,导出完了之后会发布一个Spring事件ServiceBeanExportedEvent 2. 在ServiceConfig.export()方法中会先调用checkAndUpdateSubConfigs(),这个方法主要完成AbstractConfig的参数刷新(从配置中心获取参数等等),AbstractConfig是指ApplicationConfig、ProtocolConfig、ServiceConfig等等,刷新完后会检查stub、local、mock等参数是否配置正确 3. 参数刷新和检查完成了之后,就会开始导出服务,如果配置了延迟导出,那么则按指定的时间利用ScheduledExecutorService来进行延迟导出 4. 否则调用doExport()进行服务导出 5. 继续调用doExportUrls()进行服务导出 6. 首先通过loadRegistries()方法获得所配置的注册中心的URL,可能配了多个配置中心,那么当前所导出的服务需要注册到每个配置中心去,这里,注册中心的是以URL的方式来表示的,使用的是什么注册中心、注册中心的地址和端口,给注册中心所配置的参数等等,都会存在在URL上,此URL以**registry://**开始 7. 获得到注册中心的registryURLs之后,就会遍历当前服务所有的ProtocolConfig,调用doExportUrlsFor1Protocol(protocolConfig, registryURLs);方法把当前服务按每个协议每个注册中心分别进行导出 8. 在doExportUrlsFor1Protocol()方法中,会先构造一个服务URL,包括 1. 服务的协议dubbo://, 2. 服务的IP和PORT,如果指定了就取指定的,没有指定IP就获取服务器上网卡的IP, 3. 以及服务的PATH,如果没有指定PATH参数,则取接口名 4. 以及服务的参数,参数包括服务的参数,服务中某个方法的参数 5. 最终得到的URL类似: dubbo://192.168.1.110:20880/com.tuling.DemoService?timeout=3000&&sayHello.loadbalance=random 9. 得到服务的URL之后,会把服务URL作为一个参数添加到registryURL中去,然后把registryURL、服务的接口、当前服务实现类ref生成一个Invoker代理对象,再把这个代理对象和当前ServiceConfig对象包装成一个DelegateProviderMetaDataInvoker对象,DelegateProviderMetaDataInvoker就表示了完整的一个服务 10. 接下来就会使用Protocol去export导出服务了,导出之后将得到一个Exporter对象(该Exporter对象,可以理解为主要可以用来卸载(unexport)服务,什么时候会卸载服务?在优雅关闭Dubbo应用的时候) 11. 接下来我们来详细看看Protocol是怎么导出服务的? 12. 但调用**protocol**.export(wrapperInvoker)方法时,因为protocol是Protocol接口的一个Adaptive对象,所以此时会根据wrapperInvoker的genUrl方法得到一个url,根据此url的协议找到对应的扩展点,此时扩展点就是RegistryProtocol,但是,因为Protocol接口有两个包装类,一个是ProtocolFilterWrapper、ProtocolListenerWrapper,所以实际上在调用export方法时,会经过这两个包装类的export方法,但是在这两个包装类的export方法中都会Registry协议进行了判断,不会做过多处理,所以最终会直接调用到RegistryProtocol的export(Invoker originInvoker)方法 13. 在RegistryProtocol的export(Invoker originInvoker)方法中,主要完成了以下几件事情: 1. 生成监听器,监听动态配置中心此服务的参数数据的变化,一旦监听到变化,则重写服务URL,并且在服务导出时先重写一次服务URL 2. 拿到重写之后的URL之后,调用doLocalExport()进行服务导出,在这个方法中就会调用DubboProtocol的export方法去导出服务了,导出成功后将得到一个ExporterChangeableWrapper 1. 在DubboProtocol的export方法中主要要做的事情就是启动NettyServer,并且设置一系列的RequestHandler,以便在接收到请求时能依次被这些RequestHandler所处理 2. 这些RequestHandler在上文已经整理过了 3. 从originInvoker中获取注册中心的实现类,比如ZookeeperRegistry 4. 将重写后的服务URL进行简化,把不用存到注册中心去的参数去除 5. 把简化后的服务URL调用ZookeeperRegistry.registry()方法注册到注册中心去 6. 最后将ExporterChangeableWrapper封装为DestroyableExporter对象返回,完成服务导出 ### Exporter架构 一个服务导出成功后,会生成对应的Exporter: 1. DestroyableExporter:Exporter的最外层包装类,这个类的主要作用是可以用来unexporter对应的服务 2. ExporterChangeableWrapper:这个类主要负责在unexport对应服务之前,把服务URL从注册中心中移除,把该服务对应的动态配置监听器移除 3. ListenerExporterWrapper:这个类主要负责在unexport对应服务之后,把服务导出监听器移除 4. DubboExporter:这个类中保存了对应服务的Invoker对象,和当前服务的唯一标志,当NettyServer接收到请求后,会根据请求中的服务信息,找到服务对应的DubboExporter对象,然后从对象中得到Invoker对象 ### 服务端Invoker架构 1. ProtocolFilterWrapper$CallbackRegistrationInvoker:会去调用下层Invoker,下层Invoker执行完了之后,会遍历过滤器,查看是否有过滤器实现了ListenableFilter接口,如果有,则回调对应的onResponse方法,比如TimeoutFilter,当调用完下层Invoker之后,就会计算服务的执行时间 2. ProtocolFilterWrapper$1:ProtocolFilterWrapper中的过滤器组成的Invoker,利用该Invoker,可以执行服务端的过滤器,执行完过滤器之后,调用下层Invoker 3. RegistryProtocol$InvokerDelegate:服务的的委托类,里面包含了DelegateProviderMetaDataInvoker对象和服务对应的providerUrl,执行时直接调用下层Invoker 4. DelegateProviderMetaDataInvoker:服务的的委托类,里面包含了AbstractProxyInvoker对象和ServiceConfig对象,执行时直接调用下层Invoker 5. AbstractProxyInvoker:服务接口的代理类,绑定了对应的实现类,执行时会利用反射调用服务实现类实例的具体方法,并得到结果 ### 服务端请求执行流程(后续细讲)