2018年(30)
分类: Java
2018-12-22 11:28:44
前言
本文分析dubbo的register层;此层封装服务地址的注册与发现,以服务URL为中心,扩展接口为RegistryFactory, Registry, RegistryService;
Registry接口
接口定义如下:
publicinterfaceRegistryextendsNode,RegistryService{ }publicinterfaceRegistryService{voidregister(URL url);voidunregister(URL url);voidsubscribe(URL url, NotifyListener listener);voidunsubscribe(URL url, NotifyListener listener);Listlookup(URL url); }主要提供了注册(register),注销(unregister),订阅(subscribe),退订(unsubscribe)等功能;dubbo提供了多种注册方式分别是:Multicast ,Zookeeper,Redis以及Simple方式;
Multicast:Multicast注册中心不需要启动任何中心节点,只要广播地址一样,就可以互相发现;
Zookeeper:Zookeeper是Apacahe Hadoop的子项目,是一个树型的目录服务,支持变更推送,适合作为Dubbo服务的注册中心,工业强度较高,可用于生产环境,并推荐使用;
Redis:基于Redis实现的注册中心,使用 Redis的Publish/Subscribe事件通知数据变更;
Simple:Simple注册中心本身就是一个普通的Dubbo服务,可以减少第三方依赖,使整体通讯方式一致;
后面重点介绍官方推荐的Zookeeper注册方式;具体的Register是在RegistryFactory中生成的,具体看一下接口定义;
RegistryFactory接口
接口定义如下:
@SPI("dubbo")publicinterfaceRegistryFactory{@Adaptive({"protocol"})Registry getRegistry(URL url); }RegistryFactory提供了SPI扩展,默认使用dubbo,具体有哪些扩展可以查看META-INF/dubbo/internal/com.alibaba.dubbo.registry.RegistryFactory:
dubbo=com.alibaba.dubbo.registry.dubbo.DubboRegistryFactorymulticast=com.alibaba.dubbo.registry.multicast.MulticastRegistryFactoryzookeeper=com.alibaba.dubbo.registry.zookeeper.ZookeeperRegistryFactoryredis=com.alibaba.dubbo.registry.redis.RedisRegistryFactory已推荐的Zookeeper为实例,查看ZookeeperRegistryFactory,提供了createRegistry方法:
privateZookeeperTransporter zookeeperTransporter;publicRegistrycreateRegistry(URL url){returnnewZookeeperRegistry(url, zookeeperTransporter); }实例化ZookeeperRegistry,两个参数分别是url和zookeeperTransporter,zookeeperTransporter是操作Zookeeper的客户端组件包括:zkclient和curator两种方式
@SPI("curator")publicinterfaceZookeeperTransporter{@Adaptive({Constants.CLIENT_KEY, Constants.TRANSPORTER_KEY})ZookeeperClient connect(URL url); }ZookeeperTransporter同样提供了SPI扩展,默认使用curator方式;接下来重点看一下Zookeeper注册中心。
Zookeeper注册中心
1.整体设计流程
在dubbo的整体设计中,可以大致查看Registry层的大致流程,首先通过RegistryFactory实例化Registry,Registry可以接收RegistryProtocol传过来的注册(register)和订阅(subscribe)消息,然后Registry通过ZKClient来向Zookeeper指定的目录下写入url信息,如果是订阅消息Registry会通过NotifyListener来通知RegitryDirctory进行更新url,最后就是Cluster层通过路由,负载均衡选择具体的提供方;
2.Zookeeper注册中心流程
官方提供了dubbo在Zookeeper中心的流程图:
流程说明:
服务提供者启动时: 向/dubbo/com.foo.BarService/providers目录下写入自己的URL地址;
服务消费者启动时: 订阅/dubbo/com.foo.BarService/providers目录下的提供者URL地址;并向/dubbo/com.foo.BarService/consumers目录下写入自己的URL地址;
监控中心启动时: 订阅/dubbo/com.foo.BarService 目录下的所有提供者和消费者URL地址。
下面分别从注册(register),注销(unregister),订阅(subscribe),退订(unsubscribe)四个方面来分析
3.注册(register)
ZookeeperRegistry的父类FailbackRegistry中实现了register方法,FailbackRegistry从名字可以看出来具有:失败自动恢复,后台记录失败请求,定时重发功能;下面具体看一下register方法:
publicvoidregister(URL url){super.register(url); failedRegistered.remove(url); failedUnregistered.remove(url);try{// Sending a registration request to the server sidedoRegister(url); }catch(Exception e) { Throwable t = e;// If the startup detection is opened, the Exception is thrown directly.booleancheck = getUrl().getParameter(Constants.CHECK_KEY,true) && url.getParameter(Constants.CHECK_KEY,true) && !Constants.CONSUMER_PROTOCOL.equals(url.getProtocol());booleanskipFailback = tinstanceofSkipFailbackWrapperException;if(check || skipFailback) {if(skipFailback) { t = t.getCause(); }thrownewIllegalStateException("Failed to register "+ url +" to registry "+ getUrl().getAddress() +", cause: "+ t.getMessage(), t); }else{ logger.error("Failed to register "+ url +", waiting for retry, cause: "+ t.getMessage(), t); }// Record a failed registration request to a failed list, retry regularlyfailedRegistered.add(url); } }后台记录了失败的请求,包括failedRegistered和failedUnregistered,注册的时候将里面存放的url删除,然后执行doRegister方法,此方式在ZookeeperRegistry中实现,主要是在Zookeeper指定的目录下写入url信息,如果失败会记录注册失败的url,等待自动恢复;doRegister相关代码如下:
protectedvoiddoRegister(URL url){try{ zkClient.create(toUrlPath(url), url.getParameter(Constants.DYNAMIC_KEY,true)); }catch(Throwable e) {thrownewRpcException("Failed to register "+ url +" to zookeeper "+ getUrl() +", cause: "+ e.getMessage(), e); } }调用zkClient的create方法在Zookeeper上创建节点,默认创建临时节点,create方法在AbstractZookeeperClient中实现,具体源码如下:
publicvoidcreate(String path,booleanephemeral){if(!ephemeral) {if(checkExists(path)) {return; } }inti = path.lastIndexOf('/');if(i >0) { create(path.substring(0, i),false); }if(ephemeral) { createEphemeral(path); }else{ createPersistent(path); } }path指定需要创建的目录,ephemeral指定是否是创建临时节点,并且提供了递归创建目录,除了叶子目录其他目录都是持久化的;可以发现不管是创建临时目录还是持久化目录,都没有指定目录的Data,所有使用的是默认值,也就是本地ip地址;实例中创建的目录如下:
/dubbo/com.dubboApi.DemoService/providers/dubbo%3A%2F%2F10.13.83.7%3A20880%2Fcom.dubboApi.DemoService%3Fanyhost%3Dtrue%26application%3Dhello-world-app%26dubbo%3D2.0.2%26generic%3Dfalse%26interface%3Dcom.dubboApi.DemoService%26methods%3DsyncSayHello%2CsayHello%2CasyncSayHello%26pid%3D13252%26serialization%3Dprotobuf%26side%3Dprovider%26timestamp%3D1545297239027dubbo是一个根节点,然后是service名称,providers是固定的一个类型,如果是消费端这里就是consumers,最后就是一个临时节点;使用临时节点的目的就是提供者出现断电等异常停机时,注册中心能自动删除提供者信息;可以通过如下方法查询当前的目录节点信息:
publicclassCuratorTest{staticStringpath ="/dubbo";staticCuratorFramework client = CuratorFrameworkFactory.builder().connectString("127.0.0.1:2181") .sessionTimeoutMs(5000).retryPolicy(newExponentialBackoffRetry(1000,3)).build(); publicstaticvoidmain(String[] args) throws Exception { client.start();Listpaths = listChildren(path);for(Stringpath : paths) { Stat stat =newStat(); System.err.println("path:"+ path +",value:"+newString(client.getData().storingStatIn(stat).forPath(path))); } } privatestaticList listChildren(Stringpath) throws Exception {List pathList =newArrayList (); pathList.add(path);List list = client.getChildren().forPath(path);if(list !=null&& list.size() >0) {for(StringcPath : list) {Stringtemp ="";if("/".equals(path)) { temp = path + cPath; }else{ temp = path +"/"+ cPath; } pathList.addAll(listChildren(temp)); } }returnpathList; } } 递归遍历/dubbo目录下的所有子目录,同时将节点存储的数据都查询出来,结果如下:
path:/dubbo,value:10.13.83.7path:/dubbo/com.dubboApi.DemoService,value:10.13.83.7path:/dubbo/com.dubboApi.DemoService/configurators,value:10.13.83.7path:/dubbo/com.dubboApi.DemoService/providers,value:10.13.83.7path:/dubbo/com.dubboApi.DemoService/providers/dubbo%3A%2F%2F10.13.83.7%3A20880%2Fcom.dubboApi.DemoService%3Fanyhost%3Dtrue%26application%3Dhello-world-app%26dubbo%3D2.0.2%26generic%3Dfalse%26interface%3Dcom.dubboApi.DemoService%26methods%3DsyncSayHello%2CsayHello%2CasyncSayHello%26pid%3D4712%26serialization%3Dprotobuf%26side%3Dprovider%26timestamp%3D1545358401966,value:10.13.83.7除了最后一个节点是临时节点,其他都是持久化的;
4.注销(unregister)
同样在父类FailbackRegistry中实现了unregister方法,代码如下:
publicvoidunregister(URL url){super.unregister(url); failedRegistered.remove(url); failedUnregistered.remove(url);try{// Sending a cancellation request to the server sidedoUnregister(url); }catch(Exception e) { Throwable t = e;// If the startup detection is opened, the Exception is thrown directly.booleancheck = getUrl().getParameter(Constants.CHECK_KEY,true) && url.getParameter(Constants.CHECK_KEY,true) && !Constants.CONSUMER_PROTOCOL.equals(url.getProtocol());booleanskipFailback = tinstanceofSkipFailbackWrapperException;if(check || skipFailback) {if(skipFailback) { t = t.getCause(); }thrownewIllegalStateException("Failed to unregister "+ url +" to registry "+ getUrl().getAddress() +", cause: "+ t.getMessage(), t); }else{ logger.error("Failed to uregister "+ url +", waiting for retry, cause: "+ t.getMessage(), t); }// Record a failed registration request to a failed list, retry regularlyfailedUnregistered.add(url); } }注销时同样删除了failedRegistered和failedUnregistered存放的url,然后调用doUnregister,删除Zookeeper中的目录节点,失败的情况下会存储在failedUnregistered中,等待重试;
protectedvoiddoUnregister(URL url){try{ zkClient.delete(toUrlPath(url)); }catch(Throwable e) {thrownewRpcException("Failed to unregister "+ url +" to zookeeper "+ getUrl() +", cause: "+ e.getMessage(), e); } }//CuratorZookeeperClient删除操作publicvoiddelete(String path){try{ client.delete().forPath(path); }catch(NoNodeException e) { }catch(Exception e) {thrownewIllegalStateException(e.getMessage(), e); } }直接使用CuratorZookeeperClient中的delete方法删除临时节点;
5.订阅(subscribe)
服务消费者启动时,会先向Zookeeper注册消费者节点信息,然后订阅…/providers目录下提供者的URL地址;消费端也同样需要注册节点信息,是因为监控中心需要对服务端和消费端都进行监控;下面重点看一下订阅的相关代码,在父类FailbackRegistry中实现了subscribe方法:
publicvoidsubscribe(URL url, NotifyListener listener){super.subscribe(url, listener); removeFailedSubscribed(url, listener);try{// Sending a subscription request to the server sidedoSubscribe(url, listener); }catch(Exception e) { Throwable t = e; List urls = getCacheUrls(url);if(urls !=null&& !urls.isEmpty()) { notify(url, listener, urls); logger.error("Failed to subscribe "+ url +", Using cached list: "+ urls +" from cache file: "+ getUrl().getParameter(Constants.FILE_KEY, System.getProperty("user.home") +"/dubbo-registry-"+ url.getHost() +".cache") +", cause: "+ t.getMessage(), t); }else{// If the startup detection is opened, the Exception is thrown directly.booleancheck = getUrl().getParameter(Constants.CHECK_KEY,true) && url.getParameter(Constants.CHECK_KEY,true);booleanskipFailback = tinstanceofSkipFailbackWrapperException;if(check || skipFailback) {if(skipFailback) { t = t.getCause(); }thrownewIllegalStateException("Failed to subscribe "+ url +", cause: "+ t.getMessage(), t); }else{ logger.error("Failed to subscribe "+ url +", waiting for retry, cause: "+ t.getMessage(), t); } }// Record a failed registration request to a failed list, retry regularlyaddFailedSubscribed(url, listener); } }类似的格式,同样存储了失败了订阅url信息,重点看ZookeeperRegistry中的doSubscribe方法:
privatefinalConcurrentMap zkListeners =newConcurrentHashMap(); protectedvoiddoSubscribe(finalURL url,finalNotifyListener listener) {try{if(Constants.ANY_VALUE.equals(url.getServiceInterface())) {Stringroot = toRootPath(); ConcurrentMap listeners = zkListeners.get(url);if(listeners ==null) { zkListeners.putIfAbsent(url,newConcurrentHashMap()); listeners = zkListeners.get(url); } ChildListener zkListener = listeners.get(listener);if(zkListener ==null) { listeners.putIfAbsent(listener,newChildListener() {@OverridepublicvoidchildChanged(StringparentPath,ListcurrentChilds) {for(Stringchild : currentChilds) { child = URL.decode(child);if(!anyServices.contains(child)) { anyServices.add(child); subscribe(url.setPath(child).addParameters(Constants.INTERFACE_KEY, child, Constants.CHECK_KEY,String.valueOf(false)), listener); } } } }); zkListener = listeners.get(listener); } zkClient.create(root,false);List services = zkClient.addChildListener(root, zkListener);if(services !=null&& !services.isEmpty()) {for(Stringservice : services) { service = URL.decode(service); anyServices.add(service); subscribe(url.setPath(service).addParameters(Constants.INTERFACE_KEY, service, Constants.CHECK_KEY,String.valueOf(false)), listener); } } }else{Listurls =newArrayList();for(Stringpath : toCategoriesPath(url)) { ConcurrentMap listeners = zkListeners.get(url);if(listeners ==null) { zkListeners.putIfAbsent(url,newConcurrentHashMap()); listeners = zkListeners.get(url); } ChildListener zkListener = listeners.get(listener);if(zkListener ==null) { listeners.putIfAbsent(listener,newChildListener() {@OverridepublicvoidchildChanged(StringparentPath,List currentChilds) { ZookeeperRegistry.this.notify(url, listener, toUrlsWithEmpty(url, parentPath, currentChilds)); } }); zkListener = listeners.get(listener); } zkClient.create(path,false);List children = zkClient.addChildListener(path, zkListener);if(children !=null) { urls.addAll(toUrlsWithEmpty(url, path, children)); } } notify(url, listener, urls); } }catch(Throwable e) {thrownewRpcException("Failed to subscribe "+ url +" to zookeeper "+ getUrl() +", cause: "+ e.getMessage(), e); } } 在ZookeeperRegistry中定义了一个zkListeners变量,每个URL对应了一个map;map里面分别是NotifyListener和ChildListener的对应关系,消费端订阅时这里的NotifyListener其实就是RegistryDirectory,ChildListener是一个内部类,用来在监听的节点发生变更时,通知对应的消费端,具体的监听处理是在zkClient.addChildListener中实现的:
publicListaddChildListener(String path, final ChildListener listener){ ConcurrentMap listeners = childListeners.get(path);if(listeners ==null) { childListeners.putIfAbsent(path,newConcurrentHashMap()); listeners = childListeners.get(path); } TargetChildListener targetListener = listeners.get(listener);if(targetListener ==null) { listeners.putIfAbsent(listener, createTargetChildListener(path, listener)); targetListener = listeners.get(listener); }returnaddTargetChildListener(path, targetListener); }publicCuratorWatchercreateTargetChildListener(String path, ChildListener listener){returnnewCuratorWatcherImpl(listener); }publicListaddTargetChildListener(String path, CuratorWatcher listener){try{returnclient.getChildren().usingWatcher(listener).forPath(path); }catch(NoNodeException e) {returnnull; }catch(Exception e) {thrownewIllegalStateException(e.getMessage(), e); } }privateclassCuratorWatcherImplimplementsCuratorWatcher{privatevolatileChildListener listener;publicCuratorWatcherImpl(ChildListener listener){this.listener = listener; }publicvoidunwatch(){this.listener =null; } @Overridepublicvoidprocess(WatchedEventevent) throws Exception{if(listener !=null) { String path =event.getPath() ==null?"":event.getPath(); listener.childChanged(path, StringUtils.isNotEmpty(path) ? client.getChildren().usingWatcher(this).forPath(path) : Collections.emptyList()); } } }CuratorWatcherImpl实现了Zookeeper的监听接口CuratorWatcher,用来在节点有变更时通知对应的ChildListener,这样ChildListener就可以通知RegistryDirectory进行更新数据;
6.退订(unsubscribe)
在父类FailbackRegistry中实现了unsubscribe方法
publicvoidunsubscribe(URL url, NotifyListener listener){super.unsubscribe(url, listener); removeFailedSubscribed(url, listener);try{// Sending a canceling subscription request to the server sidedoUnsubscribe(url, listener); }catch(Exception e) { Throwable t = e;// If the startup detection is opened, the Exception is thrown directly.booleancheck = getUrl().getParameter(Constants.CHECK_KEY,true) && url.getParameter(Constants.CHECK_KEY,true);booleanskipFailback = tinstanceofSkipFailbackWrapperException;if(check || skipFailback) {if(skipFailback) { t = t.getCause(); }thrownewIllegalStateException("Failed to unsubscribe "+ url +" to registry "+ getUrl().getAddress() +", cause: "+ t.getMessage(), t); }else{ logger.error("Failed to unsubscribe "+ url +", waiting for retry, cause: "+ t.getMessage(), t); }// Record a failed registration request to a failed list, retry regularlySet listeners = failedUnsubscribed.get(url);if(listeners ==null) { failedUnsubscribed.putIfAbsent(url,newConcurrentHashSet()); listeners = failedUnsubscribed.get(url); } listeners.add(listener); } }同样使用failedUnsubscribed用来存储失败退订的url,具体看ZookeeperRegistry中的doUnsubscribe方法
protectedvoiddoUnsubscribe(URL url, NotifyListener listener){ ConcurrentMap listeners = zkListeners.get(url);if(listeners !=null) { ChildListener zkListener = listeners.get(listener);if(zkListener !=null) {if(Constants.ANY_VALUE.equals(url.getServiceInterface())) { String root = toRootPath(); zkClient.removeChildListener(root, zkListener); }else{for(String path : toCategoriesPath(url)) { zkClient.removeChildListener(path, zkListener); } } } } }退订就比较简单了,只需要移除监听器就可以了;
7.失败重试
FailbackRegistry从名字可以看出来具有:失败自动恢复,后台记录失败请求,定时重发功能;在FailbackRegistry的构造器中启动了一个定时器:
this.retryFuture = retryExecutor.scheduleWithFixedDelay(newRunnable() {@Overridepublicvoidrun(){// Check and connect to the registrytry{ retry(); }catch(Throwable t) {// Defensive fault tolerancelogger.error("Unexpected error occur at failed retry, cause: "+ t.getMessage(), t); } } }, retryPeriod, retryPeriod, TimeUnit.MILLISECONDS);实例化了一个间隔5秒执行一次重试的定时器,retry部分代码如下:
protectedvoidretry(){if(!failedRegistered.isEmpty()) { Set failed =newHashSet(failedRegistered);if(failed.size() >0) {if(logger.isInfoEnabled()) { logger.info("Retry register "+ failed); }try{for(URL url : failed) {try{ doRegister(url); failedRegistered.remove(url); }catch(Throwable t) {// Ignore all the exceptions and wait for the next retrylogger.warn("Failed to retry register "+ failed +", waiting for again, cause: "+ t.getMessage(), t); } } }catch(Throwable t) {// Ignore all the exceptions and wait for the next retrylogger.warn("Failed to retry register "+ failed +", waiting for again, cause: "+ t.getMessage(), t); } } } ...省略... }定期检查是否存在失败的注册(register),注销(unregister),订阅(subscribe),退订(unsubscribe)URL,如果存在则重试;
总结
本文首先介绍了RegistryFactory, Registry, RegistryService几个核心接口,然后以Zookeeper为注册中心重点介绍了注册(register),注销(unregister),订阅(subscribe),退订(unsubscribe)方式。 合理利用自己每一分每一秒的时间来学习提升自己,不要再用"没有时间“来掩饰自己思想上的懒惰!趁年轻,使劲拼,给未来的自己一个交代!