对于Dubbo的容错主要是有两层。第一层是mock,第二层是用户配置的容错策略。对于集群容错的包装逻辑入口就在于RegistryProtocol的doRefer()方法最后的cluster.join(directory),该方法返回了集群包装过后的invoker,这里的cluser其实就是MockClusterWrapper(至于为什么能确定是MockClusterInvoker,就需要大家去理解一下Dubbo的SPI机制了),下面一起来看一下MockClusterInvoke的具体内容: //真正起作用的是MockClusterInvoker public <T> Invoker<T> join(Directory<T> directory) throws RpcException { //因为cluster下面之后一层包装,所以这里的this.cluser就是默认的FialoverCluster return new MockClusterInvoker<T>(directory, this.cluster.join(directory)); } //MockClusterInvoker.invoke() public Result invoke(Invocation invocation) throws RpcException { Result result = null; //获取URL中配置的mock参数 String value = directory.getUrl().getMethodParameter(invocation.getMethodName(), Constants.MOCK_KEY, Boolean.FALSE.toString()).trim(); if (value.length() == 0 || value.equalsIgnoreCase("false")){ //如果没有配置mock的话就直接进行调用 result = this.invoker.invoke(invocation); //如果配置了强制的mock就直接调用mock,不走正常调用逻辑 } else if (value.startsWith("force")) { if (logger.isWarnEnabled()) { logger.info("force-mock: " + invocation.getMethodName() + " force-mock enabled , url : " + directory.getUrl()); } result = doMockInvoke(invocation, null); } else { //调用失败之后再进行mock操作 try { result = this.invoker.invoke(invocation); }catch (RpcException e) { //mock并不会处理义务异常 if (e.isBiz()) { throw e; } else { if (logger.isWarnEnabled()) { logger.info("fail-mock: " + invocation.getMethodName() + " fail-mock enabled , url : " + directory.getUrl(), e); } result = doMockInvoke(invocation, e); } } } return result; } //主要是挑选出可用的MockInvoker类然后调用其invoke方法返回结果 private Result doMockInvoke(Invocation invocation,RpcException e){ Result result = null; Invoker<T> minvoker ; //选取可用的MockInvoker List<Invoker<T>> mockInvokers = selectMockInvoker(invocation); if (mockInvokers == null || mockInvokers.size() == 0){ minvoker = (Invoker<T>) new MockInvoker(directory.getUrl()); } else { minvoker = mockInvokers.get(0); } try { result = minvoker.invoke(invocation); } catch (RpcException me) { //如果是业务异常就封装结果(注意这里和上面的区别),因为biz异常是用户自己自己在mock信息中配置的异常,不是预想之外的异常 if (me.isBiz()) { result = new RpcResult(me.getCause()); } else { throw new RpcException(me.getCode(), getMockExceptionMessage(e, me), me.getCause()); } } catch (Throwable me) { throw new RpcException(getMockExceptionMessage(e, me), me.getCause()); } return result; } //核心最后还是调用了MockInvoker的invoker方法 public Result invoke(Invocation invocation) throws RpcException { String mock = getUrl().getParameter(invocation.getMethodName()+"."+Constants.MOCK_KEY); if (invocation instanceof RpcInvocation) { ((RpcInvocation) invocation).setInvoker(this); } if (StringUtils.isBlank(mock)){ mock = getUrl().getParameter(Constants.MOCK_KEY); } if (StringUtils.isBlank(mock)){ //这个错误比较常见,原因就在于客户端调用的时候返回的异常信息是非业务异常,但是客户端又没有配置mock信息,因此就会抛出这个异常 throw new RpcException(new IllegalAccessException("mock can not be null. url :" + url)); } //解析出mock的类型,如果是mock=fail:AA,就返回AA,如果是mock=xx.Service就返回xx.Service,如果是mock=force:XX,就返回XX mock = normallizeMock(URL.decode(mock)); //如果配置的是:mock=fail:return,就直接返回空结果 if (Constants.RETURN_PREFIX.trim().equalsIgnoreCase(mock.trim())){ RpcResult result = new RpcResult(); result.setValue(null); return result; //如果配置的是mock=fail:return **,就解析**为对应的可返回内容然后返回 } else if (mock.startsWith(Constants.RETURN_PREFIX)) { mock = mock.substring(Constants.RETURN_PREFIX.length()).trim(); mock = mock.replace('`', '"'); try { Type[] returnTypes = RpcUtils.getReturnTypes(invocation); Object value = parseMockValue(mock, returnTypes); return new RpcResult(value); } catch (Exception ew) { throw new RpcException("mock return invoke error. method :" + invocation.getMethodName() + ", mock:" + mock + ", url: "+ url , ew); } //如果配置的是mock=fail:throw **(用户自定义的异常信息),就解析**为对应的可返回内容然后返回 } else if (mock.startsWith(Constants.THROW_PREFIX)) { mock = mock.substring(Constants.THROW_PREFIX.length()).trim(); mock = mock.replace('`', '"'); if (StringUtils.isBlank(mock)){ throw new RpcException(" mocked exception for Service degradation. "); } else { //用户自定义类 Throwable t = getThrowable(mock); throw new RpcException(RpcException.BIZ_EXCEPTION, t); } } else { //如果mock信息为ServiceMock的话就直接找到对应的Mock类进行mock调用,然后返回结果 try { Invoker<T> invoker = getInvoker(mock); return invoker.invoke(invocation); } catch (Throwable t) { throw new RpcException("Failed to create mock implemention class " + mock , t); } } }
方法级别的容错总体来说是针对业务异常的一种容错,而针对非业务异常的容错逻辑就是另外一个概念了,比如说由于网络抖动导致某次调用没有成功,针对类似的异常Dubbo也有自己的容错措施,具体如下面几种: Failover Cluster 失败自动切换,当出现失败,重试其它服务器。(缺省) 通常用于读操作,但重试会带来更长延迟。 Failfast Cluster 快速失败,只发起一次调用,失败立即报错。通常用于非幂等性的写操作,比如新增记录。 Failsafe Cluster 失败安全,出现异常时,直接忽略。通常用于写入审计日志等操作。 Failback Cluster 失败自动恢复,后台记录失败请求,定时重发。 通常用于消息通知操作。 Forking Cluster 并行调用多个服务器,只要一个成功即返回。通常用于实时性要求较高的读操作,但需要浪费更多服务资源。可通过forks="2"来设置最大并行数。 Broadcast Cluster 广播调用所有提供者,逐个调用,任意一台报错则报错。(2.1.0开始支持) 通常用于通知所有提供者更新缓存或日志等本地资源信息。
Failover public Result doInvoke(Invocation invocation, final List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException { List<Invoker<T>> copyinvokers = invokers; //检查copyinvokers是否为null checkInvokers(copyinvokers, invocation); //重试次数,默认为3次,不包含第一次调用 int len = getUrl().getMethodParameter(invocation.getMethodName(), Constants.RETRIES_KEY, Constants.DEFAULT_RETRIES) + 1; if (len <= 0) { len = 1; } // last exception. RpcException le = null; //已经调用的Invoker列表 List<Invoker<T>> invoked = new ArrayList<Invoker<T>>(copyinvokers.size()); Set<String> providers = new HashSet<String>(len); for (int i = 0; i < len; i++) { //重试时,进行重新选择,避免重试时invoker列表已发生变化. //注意:如果列表发生了变化,那么invoked判断会失效,因为invoker示例已经改变 if (i > 0) { checkWheatherDestoried(); copyinvokers = list(invocation); //重新检查一下有没有对应的提供者 checkInvokers(copyinvokers, invocation); } //通过loadbalance去选出目标Invoker //这里默认的LoadBalance是RandomLoadBalance,选择时候是根据权重来选择目标的Invoker,当然也可以配置其他的LoadBalance Invoker<T> invoker = select(loadbalance, invocation, copyinvokers, invoked); invoked.add(invoker); //添加到上下文环境中去,但是这里为什么会把失败的invoker也加进来,感觉失败的Invoker信息并没有什么意义 RpcContext.getContext().setInvokers((List)invoked); try { //这里才是最后的调用,使用经过loadbalance选出的invoker去调用 Result result = invoker.invoke(invocation); if (le != null && logger.isWarnEnabled()) { logger.warn("Although retry the method " + invocation.getMethodName() + " in the service " + getInterface().getName() + " was successful by the provider " + invoker.getUrl().getAddress() + ", but there have been failed providers " + providers + " (" + providers.size() + "/" + copyinvokers.size() + ") from the registry " + directory.getUrl().getAddress() + " on the consumer " + NetUtils.getLocalHost() + " using the dubbo version " + Version.getVersion() + ". Last error is: " + le.getMessage(), le); } return result; } catch (RpcException e) { //业务异常不会重试,直接抛出 if (e.isBiz()) { throw e; } le = e; } catch (Throwable e) { le = new RpcException(e.getMessage(), e); } finally { providers.add(invoker.getUrl().getAddress()); } } throw new RpcException(le != null ? le.getCode() : 0, "Failed to invoke the method " + invocation.getMethodName() + " in the service " + getInterface().getName() + ". Tried " + len + " times of the providers " + providers + " (" + providers.size() + "/" + copyinvokers.size() + ") from the registry " + directory.getUrl().getAddress() + " on the consumer " + NetUtils.getLocalHost() + " using the dubbo version " + Version.getVersion() + ". Last error is: " + (le != null ? le.getMessage() : ""), le != null && le.getCause() != null ? le.getCause() : le); }
Failfast //快速失败的逻辑最简单了,就是什么都不做,有调用异常的话就直接往上抛出 public Result doInvoke(Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException { checkInvokers(invokers, invocation); Invoker<T> invoker = select(loadbalance, invocation, invokers, null); try { return invoker.invoke(invocation); //简单区分异常类型 } catch (Throwable e) { if (e instanceof RpcException && ((RpcException)e).isBiz()) { // biz exception. throw (RpcException) e; } throw new RpcException(e instanceof RpcException ? ((RpcException)e).getCode() : 0, "Failfast invoke providers " + invoker.getUrl() + " " + loadbalance.getClass().getSimpleName() + " select from all providers " + invokers + " for service " + getInterface().getName() + " method " + invocation.getMethodName() + " on consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion() + ", but no luck to perform the invocation. Last error is: " + e.getMessage(), e.getCause() != null ? e.getCause() : e); } }
Dubbo的负载均衡策略主要有以下几种: Random LoadBalance 随机选取服务提供者 最简单的无状态的负载均衡算法 RoundRobin LoadBalance 以轮训的方式调用服务提供者 缺点是有状态,必须在并发之下记住上一次到谁了 LeastActive LoadBalance 调用最少活跃调调用的服务提供者,这里有一点需要注意,这里的服务调用统计维度是在方法级别的,也就是说是方法级别的LoadBalance。 ConsistentHash LoadBalance 一致性Hash(用的不多,不多讲解)
首先看下LoadBalance的接口就直到它是做什么的: //从invokers列表中根据url和invocation信息选出一个合适的Invoker供consumer端调用 <T> Invoker<T> select(List<Invoker<T>> invokers, URL url, Invocation invocation) throws RpcException;
在所有的LoadBalance中都提到了一个概念:weight。正常情况下我们无论配置在provider还是service中,对应的所有服务端的providerUrl的weight都是一样的,这种情况其实weight配置不配置意义不大。但是一旦动态针对某个服务调整过weight值,这个影响就出来了。例如:override:// 配置之后,就将10.20.153.10服务器上的com.foo.BarServic的权值改为了200,那么它在之后的LoadBalance中被选取的可能性就更大了(默认的权值为100)。
Random LoadBalance private final Random random = new Random(); protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) { int length = invokers.size(); // 总个数 int totalWeight = 0; // 总权重 boolean sameWeight = true; // 权重是否都一样 for (int i = 0; i < length; i++) { int weight = getWeight(invokers.get(i), invocation); totalWeight += weight; // 累计总权重 if (sameWeight && i > 0 && weight != getWeight(invokers.get(i - 1), invocation)) { sameWeight = false; // 计算所有权重是否一样 } } if (totalWeight > 0 && ! sameWeight) { // 如果权重不相同且权重大于0则按总权重数随机 int offset = random.nextInt(totalWeight); // 随机数落到哪个片段上,就取哪个片段对应的Invoker对象(击鼓传花式往后切割) for (int i = 0; i < length; i++) { offset -= getWeight(invokers.get(i), invocation); if (offset < 0) { return invokers.get(i); } } } // 如果权重相同或权重为0则均等随机 return invokers.get(random.nextInt(length)); }
RoundRobin LoadBalance //轮训是针对方法级别的,并不是所有服务调用 protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) { String key = invokers.get(0).getUrl().getServiceKey() + "." + invocation.getMethodName(); int length = invokers.size(); // 总个数 int maxWeight = 0; // 最大权重 int minWeight = Integer.MAX_VALUE; // 最小权重 // invoker->weight,IntegerWrapper就是一个简单的Integer包装类 final LinkedHashMap<Invoker<T>, IntegerWrapper> invokerToWeightMap = new LinkedHashMap<Invoker<T>, IntegerWrapper>(); int weightSum = 0; for (int i = 0; i < length; i++) { int weight = getWeight(invokers.get(i), invocation); maxWeight = Math.max(maxWeight, weight); // 累计最大权重 minWeight = Math.min(minWeight, weight); // 累计最小权重 if (weight > 0) { invokerToWeightMap.put(invokers.get(i), new IntegerWrapper(weight)); weightSum += weight; } } AtomicPositiveInteger sequence = sequences.get(key); if (sequence == null) { sequences.putIfAbsent(key, new AtomicPositiveInteger()); sequence = sequences.get(key); } //currentSequence代表某个方法是第多少次被调用的,例如第1W次 int currentSequence = sequence.getAndIncrement(); if (maxWeight > 0 && minWeight < maxWeight) { // 权重不一样 // 可以把weightSum理解成一个权重范围内的总集,mod就带表在这个总集中具体执行到的位置 int mod = currentSequence % weightSum; //weightSum < maxWeight*length for (int i = 0; i < maxWeight; i++) { for (Map.Entry<Invoker<T>, IntegerWrapper> each : invokerToWeightMap.entrySet()) { final Invoker<T> k = each.getKey(); final IntegerWrapper v = each.getValue(); //这里的逻辑比较抽象,本质上就是谁的权重越大,轮询到谁的次数就越多 if (mod == 0 && v.getValue() > 0) { return k; } if (v.getValue() > 0) { v.decrement(); mod--; } } } } // 取模轮循 return invokers.get(currentSequence % length); }
LeastActive LoadBalance private final Random random = new Random(); protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) { int length = invokers.size(); // 总个数 int leastActive = -1; // 最小的活跃数 int leastCount = 0; // 相同最小活跃数的个数 int[] leastIndexs = new int[length]; // 相同最小活跃数的下标 int totalWeight = 0; // 总权重 int firstWeight = 0; // 第一个权重,用于于计算是否相同 boolean sameWeight = true; // 是否所有权重相同 for (int i = 0; i < length; i++) { Invoker<T> invoker = invokers.get(i); int active = RpcStatus.getStatus(invoker.getUrl(), invocation.getMethodName()).getActive(); // 活跃数 int weight = invoker.getUrl().getMethodParameter(invocation.getMethodName(), Constants.WEIGHT_KEY, Constants.DEFAULT_WEIGHT); // 权重 if (leastActive == -1 || active < leastActive) { // 如果是初始情况下或者某台机器的active数量小于现在保存的leastActive数量,就会重新开始 leastActive = active; // 记录最小活跃数 leastCount = 1; // 重新统计相同最小活跃数的个数 leastIndexs[0] = i; // 重新记录最小活跃数下标 totalWeight = weight; // 重新累计总权重 firstWeight = weight; // 记录第一个权重 sameWeight = true; // 还原权重相同标识 } else if (active == leastActive) { // 累计相同最小的活跃数 leastIndexs[leastCount ++] = i; // 累计相同最小活跃数下标 totalWeight += weight; // 累计总权重 // 判断所有权重是否一样 if (sameWeight && i > 0 && weight != firstWeight) { sameWeight = false; } } } // 如果只有一个最小值的话就直接调用 if (leastCount == 1) { // 如果只有一个最小则直接返回 return invokers.get(leastIndexs[0]); } if (! sameWeight && totalWeight > 0) { // 如果权重不相同且权重大于0则按总权重数随机 int offsetWeight = random.nextInt(totalWeight); // 并确定随机值落在哪个片断上 for (int i = 0; i < leastCount; i++) { int leastIndex = leastIndexs[i]; offsetWeight -= getWeight(invokers.get(leastIndex), invocation); if (offsetWeight <= 0) return invokers.get(leastIndex); } } // 如果权重相同或权重为0则均等随机 return invokers.get(leastIndexs[random.nextInt(leastCount)]);
我个人对于Dubbo 提供的LoadBalance的看法是:基本上满足日常使用,但是应该更加丰富。因为我们日常使用的是leastactive,但是因为该负载均衡策略是基于方法级别的,所以无法控制其他的方法对于应用的影响,这里如果将统计的维度上升到机器纬度,其实可以做到类似于整个集群的leastactive,这样的话就不容易出现部分几台机器负载特别高,而其余的大部分机器都有很多资源结余。当然也可以提供接口纬度的负载均衡,这个完全可以根据具体的业务实现定值,因为SPI机制的缘故,自定义负载均衡策略实现起来还是比较方便的。

服务调用信息,如:method, argument 等 (暂不支持参数路由) URL本身的字段,如:protocol, host, port 等 以及URL上的所有参数,如:application, organization 等
等号"="表示"匹配",如:host = 不等号"!="表示"不匹配",如:host !=
以逗号","分隔多个值,如:host !=, 以星号" "结尾,表示通配,如:host != 10.20. 以美元符"$"开头,表示引用消费者参数,如:host = $host
下面以ConditionRouter为例来看一下路由规则的具体实现 //构造函数初始化的时候难点在于when和then的初始化: public ConditionRouter(URL url) { this.url = url; //路由规则的优先级,用于排序,优先级越大越靠前执行,可不填,缺省为0。 this.priority = url.getParameter(Constants.PRIORITY_KEY, 0); //当路由结果为空时,是否强制执行,如果不强制执行,路由结果为空的路由规则将自动失效,可不填,缺省为flase this.force = url.getParameter(Constants.FORCE_KEY, false); try { String rule = url.getParameterAndDecoded(Constants.RULE_KEY); if (rule == null || rule.trim().length() == 0) { throw new IllegalArgumentException("Illegal route rule!"); } rule = rule.replace("consumer.", "").replace("provider.", ""); int i = rule.indexOf("=>"); // =>前的部分 String whenRule = i < 0 ? null : rule.substring(0, i).trim(); // =>后的部分 String thenRule = i < 0 ? rule.trim() : rule.substring(i + 2).trim(); Map<String, MatchPair> when = StringUtils.isBlank(whenRule) || "true".equals(whenRule) ? new HashMap<String, MatchPair>() : parseRule(whenRule); Map<String, MatchPair> then = StringUtils.isBlank(thenRule) || "false".equals(thenRule) ? null : parseRule(thenRule); // NOTE: When条件是允许为空的,外部业务来保证类似的约束条件 this.whenCondition = when; this.thenCondition = then; } catch (ParseException e) { throw new IllegalStateException(e.getMessage(), e); } } /** * 将对应的rule信息解析为对应的MatchPair * host=解析出来就是一个host:MatchPair,Matcher的matches内容为10.20.153.10 * host!=解析出来就是一个host:MatchPair,Matcher的mismatches内容为10.20.153.10 * 可以理解为MatcherPair就是区分matches和mismatches的具体聚合类,拿到这个Matcher就拿到表达式初步解析后的数据 * @param rule * @return * @throws ParseException */ private static Map<String, MatchPair> parseRule(String rule) throws ParseException { Map<String, MatchPair> condition = new HashMap<String, MatchPair>(); if(StringUtils.isBlank(rule)) { return condition; } // 匹配或不匹配Key-Value对 MatchPair pair = null; // 多个Value值 Set<String> values = null; final Matcher matcher = ROUTE_PATTERN.matcher(rule); //例如:host= 第一次匹配的group1='',group2='host',第二次匹配的group1='=',group2='' while (matcher.find()) { // 逐个匹配 String separator = matcher.group(1); String content = matcher.group(2); // 表达式开始 if (separator == null || separator.length() == 0) { pair = new MatchPair(); //'host':new MatchPair() condition.put(content, pair); } // &符号还没有遇到过 else if ("&".equals(separator)) { if (condition.get(content) == null) { pair = new MatchPair(); condition.put(content, pair); } else { condition.put(content, pair); } } // 匹配=号部分 else if ("=".equals(separator)) { if (pair == null) throw new RuntimeException(); values = pair.matches; values.add(content); } // 匹配!=号部分 else if ("!=".equals(separator)) { if (pair == null) throw new RuntimeException(); values = pair.mismatches; values.add(content); } // ,号直接跟着前面的=或者!=走 else if (",".equals(separator)) { if (values == null || values.size() == 0) throw new RuntimeException(); values.add(content); } else { throw new RuntimeException(); } } return condition; } public <T> List<Invoker<T>> route(List<Invoker<T>> invokers, URL url, Invocation invocation) throws RpcException { if (invokers == null || invokers.size() == 0) { return invokers; } try { //如果没有匹配到的前置条件后直接返回,意思就是当前作用的consumer信息不需要经过路由操作 // 如果路由的配置值有$开头的话就将其替换为URL中对应的key的value if (! matchWhen(url)) { return invokers; } List<Invoker<T>> result = new ArrayList<Invoker<T>>(); //黑名单 if (thenCondition == null) { logger.warn("The current consumer in the service blacklist. consumer: " + NetUtils.getLocalHost() + ", service: " + url.getServiceKey()); return result; } for (Invoker<T> invoker : invokers) { //逐个匹配后置条件 if (matchThen(invoker.getUrl(), url)) { result.add(invoker); } } if (result.size() > 0) { return result; //感觉强制执行的话返回一个空的List并没有卵用呀 } else if (force) { logger.warn("The route result is empty and force execute. consumer: " + NetUtils.getLocalHost() + ", service: " + url.getServiceKey() + ", router: " + url.getParameterAndDecoded(Constants.RULE_KEY)); return result; } } catch (Throwable t) { logger.error("Failed to execute condition router rule: " + getUrl() + ", invokers: " + invokers + ", cause: " + t.getMessage(), t); } return invokers; }






