directory
Dubbo的目录服务简单来说就是消费者将自己能够调用的服务提供者的信息缓存到本地Directory中,当服务提供者有所变化时会通知到注册中心,消费者会监听注册中心相关服务的消息,当收到相关服务提供者变动的消息时会更新本地服务目录Directory,实现类RegistryDirectory中主要提供了两个功能接口:
(1)notify(List<URL> urls):当注册中心有通知变化时会通知服务消费者更新本地的服务目录
(2)List<Invoker<T>> doList(Invocation invocation):获取消费者能够远程调用的所有服务提供者列表
服务目录Directory简单来说就是维护了一个List,提供两个接口分别通知修改list和获取符合消费者要求的服务列表
接下来我们看看RegistryDirectory中的操作:
接口Directory
public interface Directory<T> extends Node {
/**
* get service type.
*
* @return service type.
*/
Class<T> getInterface();
/**
* list invokers.
*
* @return invokers
*/
List<Invoker<T>> list(Invocation invocation) throws RpcException;
}
在抽象类AbstractDirectory中实现了list函数,获取消费者对应的服务提供者,并根据路由规则筛选最终的服务提供者
public List<Invoker<T>> list(Invocation invocation) throws RpcException {
if (destroyed) {
throw new RpcException("Directory already destroyed .url: " + getUrl());
}
//获取所有提供服务的调用者
List<Invoker<T>> invokers = doList(invocation);
List<Router> localRouters = this.routers; // local reference
//根据路由规则进一步过滤,不匹配的调用者会被过滤(路由相关的知识接下来介绍)
if (localRouters != null && !localRouters.isempty()) {
for (Router router : localRouters) {
try {
if (router.getUrl() == null || router.getUrl().getparameter(constants.runtime_KEY, false)) {
invokers = router.route(invokers, getconsumerUrl(), invocation);
}
} catch (throwable t) {
logger.ERROR("Failed to execute router: " + getUrl() + ", cause: " + t.getmessage(), t);
}
}
}
return invokers;
}
在RegistryDirectory中会根据method来选择服务消费者对应的服务提供者的Invokers
public List<Invoker<T>> doList(Invocation invocation) {
if (forbidden) {
// 1. No service provider 2. Service providers are disabled
throw new RpcException(RpcException.FORBIDDEN_EXCEPTION,
"No provider available from registry " + getUrl().getAddress() + " for service " + getConsumerUrl().getServiceKey() + " on consumer " + NetUtils.getLocalHost()
+ " use dubbo version " + Version.getVersion() + ", please check status of providers(disabled, not registered or in blacklist).");
}
List<Invoker<T>> invokers = null;
Map<String, List<Invoker<T>>> localMethodInvokerMap = this.methodInvokerMap; // local reference
if (localMethodInvokerMap != null && localMethodInvokerMap.size() > 0) {
//获取服务提供者方法
String methodName = RpcUtils.getMethodName(invocation);
//参数数组
Object[] args = RpcUtils.getarguments(invocation);
//方法map中获取服务提供者
if (args != null && args.length > 0 && args[0] != null
&& (args[0] instanceof String || args[0].getClass().isEnum())) {
invokers = localMethodInvokerMap.get(methodName + "." + args[0]); // The routing can be enumerated according to the first parameter
}
if (invokers == null) {
invokers = localMethodInvokerMap.get(methodName);
}
if (invokers == null) {
invokers = localMethodInvokerMap.get(Constants.ANY_VALUE);
}
if (invokers == null) {
Iterator<List<Invoker<T>>> iterator = localMethodInvokerMap.values().iterator();
if (iterator.hasNext()) {
invokers = iterator.next();
}
}
}
//返回服务提供者
return invokers == null ? new ArrayList<Invoker<T>>(0) : invokers;
}
接下来我们看看localMethodInvokerMap中的数据是如何初始化进去的,接下来我们看看RegistryDirectory实现的另外一个接口notify中的操作,服务提供者出现变化时注册中心会将消息通知到消息者,消费者收到通知消息会调用notify函数,完成消费者本地服务目录相关信息的刷新
public synchronized void notify(List<URL> urls) {
List<URL> invokerUrls = new ArrayList<URL>();
List<URL> routerUrls = new ArrayList<URL>();
List<URL> configuratorUrls = new ArrayList<URL>();
for (URL url : urls) {
String protocol = url.getProtocol();
String category = url.getParameter(Constants.CATEGORY_KEY, Constants.DEFAULT_CATEGORY);
//路由相关的url信息
if (Constants.ROUTERS_CATEGORY.equals(category)
|| Constants.ROUTE_PROTOCOL.equals(protocol)) {
routerUrls.add(url);
//configurators相关的url信息
} else if (Constants.CONFIGURATORS_CATEGORY.equals(category)
|| Constants.OVERRIDE_PROTOCOL.equals(protocol)) {
configuratorUrls.add(url);
//服务提供者相关的url信息
} else if (Constants.PROVIDERS_CATEGORY.equals(category)) {
invokerUrls.add(url);
} else {
logger.warn("Unsupported category " + category + " in notified url: " + url + " from registry " + getUrl().getAddress() + " to consumer " + NetUtils.getLocalHost());
}
}
// configurators
if (configuratorUrls != null && !configuratorUrls.isEmpty()) {
this.configurators = toConfigurators(configuratorUrls);
}
// routers
if (routerUrls != null && !routerUrls.isEmpty()) {
List<Router> routers = toRouters(routerUrls);
if (routers != null) { // null - do nothing
setRouters(routers);
}
}
List<Configurator> localConfigurators = this.configurators; // local reference
// merge override parameters
this.overrideDirectoryUrl = directoryUrl;
if (localConfigurators != null && !localConfigurators.isEmpty()) {
for (Configurator configurator : localConfigurators) {
this.overrideDirectoryUrl = configurator.configure(overrideDirectoryUrl);
}
}
// providers
//更新服务提供相关的Invoker
refreshInvoker(invokerUrls);
}
refreshInvoker在每次刷新消费者缓存中的invoker是会清除一下已经不存在注册中心中的数据 //注册中心中所有的invokerUrls
private void refreshInvoker(List<URL> invokerUrls) {
if (invokerUrls != null && invokerUrls.size() == 1 && invokerUrls.get(0) != null
&& Constants.EMPTY_PROTOCOL.equals(invokerUrls.get(0).getProtocol())) {
this.forbidden = true; // Forbid to access
this.methodInvokerMap = null; // Set the method invoker map to null
//清除所有的Invoker
destroyAllInvokers(); // Close all invokers
} else {
this.forbidden = false; // Allow to access
Map<String, Invoker<T>> oldUrlInvokerMap = this.urlInvokerMap; // local reference
if (invokerUrls.isEmpty() && this.cachedInvokerUrls != null) {
invokerUrls.addAll(this.cachedInvokerUrls);
} else {
this.cachedInvokerUrls = new HashSet<URL>();
this.cachedInvokerUrls.addAll(invokerUrls);//Cached invoker urls, convenient for comparison
}
if (invokerUrls.isEmpty()) {
return;
}
//将注册中心的地址信息转为Invoker
Map<String, Invoker<T>> newUrlInvokerMap = toInvokers(invokerUrls);// Translate url list to Invoker map
Map<String, List<Invoker<T>>> newMethodInvokerMap = toMethodInvokers(newUrlInvokerMap); // Change method name to map Invoker Map
// state change
// If the calculation is wrong, it is not processed.
if (newUrlInvokerMap == null || newUrlInvokerMap.size() == 0) {
logger.error(new IllegalStateException("urls to invokers error .invokerUrls.size :" + invokerUrls.size() + ", invoker.size :0. urls :" + invokerUrls.toString()));
return;
}
this.methodInvokerMap = multiGroup ? toMergeMethodInvokerMap(newMethodInvokerMap) : newMethodInvokerMap;
this.urlInvokerMap = newUrlInvokerMap;
try {
//查找oldUrlInvokerMap中不存在newUrlInvokerMap中的删除掉
destroyUnusedInvokers(oldUrlInvokerMap, newUrlInvokerMap); // Close the unused Invoker
} catch (Exception e) {
logger.warn("destroyUnusedInvokers error. ", e);
}
}
}
在destroyUnusedInvokers中会删除无用的Invoker
private void destroyUnusedInvokers(Map<String, Invoker<T>> oldUrlInvokerMap, Map<String, Invoker<T>> newUrlInvokerMap) {
if (newUrlInvokerMap == null || newUrlInvokerMap.size() == 0) {
destroyAllInvokers();
return;
}
// check deleted invoker
List<String> deleted = null;
if (oldUrlInvokerMap != null) {
Collection<Invoker<T>> newInvokers = newUrlInvokerMap.values();
//判断哪些Invoker不存在,添加到list中
for (Map.Entry<String, Invoker<T>> entry : oldUrlInvokerMap.entrySet()) {
if (!newInvokers.contains(entry.getValue())) {
if (deleted == null) {
deleted = new ArrayList<String>();
}
deleted.add(entry.getKey());
}
}
}
//循环调用销毁方法进行销毁
if (deleted != null) {
for (String url : deleted) {
if (url != null) {
Invoker<T> invoker = oldUrlInvokerMap.remove(url);
if (invoker != null) {
try {
invoker.destroy();
if (logger.isDebugEnabled()) {
logger.debug("destory invoker[" + invoker.getUrl() + "] success. ");
}
} catch (Exception e) {
logger.warn("destory invoker[" + invoker.getUrl() + "] faild. " + e.getMessage(), e);
}
}
}
}
}
}
相关阅读
A5创业网(公众号:iadmin5)10.17日讯,近日海淀法院审结了上海文化广播影视集团有限公司诉北京搜狐互联网信息服务有限公司、飞狐信
RadioGroup的RadioButton简单用法——学习笔记
关于RadioButton,它的具体例子嘛,就好像是QQ啊、微信之类的app底部那几个按钮,不过他们是不是用RadioButton来实现的我不太清楚,但是R
A5创业网(公众号:iadmin5)1月24日讯,虽然大家都知道,滴滴在2018年宣布无限期下线顺风车业务,进行整改。但是随着春运的开始,各方出行
服务宝是淘宝向消费者推出的适用于手机类目的增值服务,服务旨在解决消费者购买适用产品时对商品的顾虑和质量保修问题,也是变相提高
近日网速还算可以,于是乎决定下载一下Android源代码以供在家研究学习。下载之前先认识一下repo,整个Android源码是由很多个git项目