//Give the balancer the current cluster state. this.balancer.setClusterMetrics(getClusterMetricsWithoutCoprocessor()); this.balancer.setClusterLoad(assignments);
List<RegionPlan> plans = newArrayList<>(); for (Entry<TableName, Map<ServerName, List<RegionInfo>>> e : assignments.entrySet()) { // 关注点6:是否需要进行balancer在此处判断,如果需要balance,则生成RegionPlan List<RegionPlan> partialPlans = this.balancer.balanceCluster(e.getKey(), e.getValue()); if (partialPlans != null) { plans.addAll(partialPlans); } }
// 开始进行balance longbalanceStartTime= System.currentTimeMillis(); longcutoffTime= balanceStartTime + this.maxBlancingTime; intrpCount=0; // number of RegionPlans balanced so far if (plans != null && !plans.isEmpty()) { intbalanceInterval=this.maxBlancingTime / plans.size(); LOG.info("Balancer plans size is " + plans.size() + ", the balance interval is " + balanceInterval + " ms, and the max number regions in transition is " + maxRegionsInTransition);
for (RegionPlan plan: plans) { LOG.info("balance " + plan); //TODO: bulk assign try { //关注点7:实际去进行balance,把当前RS上的Region移动到别的RS上。 this.assignmentManager.moveAsync(plan); } catch (HBaseIOException hioe) { //should ignore failed plans here, avoiding the whole balance plans be aborted //later calls of balance() can fetch up the failed and skipped plans LOG.warn("Failed balance plan: {}, just skip it", plan, hioe); } //rpCount records balance plans processed, does not care if a plan succeeds rpCount++;
// if performing next balance exceeds cutoff time, exit the loop if (rpCount < plans.size() && System.currentTimeMillis() > cutoffTime) { // TODO: After balance, there should not be a cutoff time (keeping it as // a security net for now) LOG.debug("No more balancing till next balance run; maxBalanceTime=" + this.maxBlancingTime); break; } } } } // If LoadBalancer did not generate any plans, it means the cluster is already balanced. // Return true indicating a success. returntrue; }
// On clusters with lots of HFileLinks or lots of reference files, // instantiating the storefile infos can be quite expensive. // Allow turning this feature off if the locality cost is not going to // be used in any computations. RegionLocationFinderfinder=null; if ((this.localityCost != null && this.localityCost.getMultiplier() > 0) || (this.rackLocalityCost != null && this.rackLocalityCost.getMultiplier() > 0)) { finder = this.regionFinder; }
//The clusterState that is given to this method contains the state //of all the regions in the table(s) (that's true today) // Keep track of servers to iterate through them. Clustercluster=newCluster(clusterState, loads, finder, rackManager);
// Should this be kept? // 模拟执行后是否有收益,如果有,则往下执行,否则回滚收益:即本次action不进行 if (newCost < currentCost) { currentCost = newCost;
// save for JMX curOverallCost = currentCost; for (inti=0; i < this.curFunctionCosts.length; i++) { curFunctionCosts[i] = tempFunctionCosts[i]; } } else { // Put things back the way they were before. // TODO: undo by remembering old values ActionundoAction= action.undoAction(); cluster.doAction(undoAction); updateCostsWithAction(cluster, undoAction); }
// update costs metrics updateStochasticCosts(tableName, curOverallCost, curFunctionCosts); if (initCost > currentCost) { plans = createRegionPlans(cluster); LOG.info("Finished computing new load balance plan. Computation took {}" + " to try {} different iterations. Found a solution that moves " + "{} regions; Going from a computed cost of {}" + " to a new cost of {}", java.time.Duration.ofMillis(endTime - startTime), step, plans.size(), initCost, currentCost); return plans; } LOG.info("Could not find a better load balance plan. Tried {} different configurations in " + "{}, and did not find anything with a computed cost less than {}", step, java.time.Duration.ofMillis(endTime - startTime), initCost); returnnull; }
// Find a non-master regionserver to host the region if (keyIt == null || !keyIt.hasNext()) { keyIt = clusterMap.keySet().iterator(); } ServerNamedest= keyIt.next(); if (masterServerName.equals(dest)) { if (!keyIt.hasNext()) { keyIt = clusterMap.keySet().iterator(); } dest = keyIt.next(); }
// Move this region away from the master regionserver RegionPlanplan=newRegionPlan(region, masterServerName, dest); if (plans == null) { plans = newArrayList<>(); } plans.add(plan); } } for (Map.Entry<ServerName, List<RegionInfo>> server: clusterMap.entrySet()) { if (masterServerName.equals(server.getKey())) continue; for (RegionInfo region: server.getValue()) { if (!shouldBeOnMaster(region)) continue;
// Move this region to the master regionserver RegionPlanplan=newRegionPlan(region, server.getKey(), masterServerName); if (plans == null) { plans = newArrayList<>(); } plans.add(plan); } } return plans; }
// numMovedRegions的计算方式。 publicvoiddoAction(){ if (initialRegionIndexToServerIndex[region] == newServer) { numMovedRegions--; //region moved back to original location } elseif (oldServer >= 0 && initialRegionIndexToServerIndex[region] == oldServer) { numMovedRegions++; //region moved from original location } }
// It's possible that there aren't enough regions to go around // 最优分布的cost double min; if (count > total) { min = ((count - total) * mean) + ((1 - mean) * total); } else { // Some will have 1 more than everything else. intnumHigh= (int) (total - (Math.floor(mean) * count)); intnumLow= (int) (count - numHigh); min = (numHigh * (Math.ceil(mean) - mean)) + (numLow * (mean - Math.floor(mean)));
// 遍历所有的RS for (inti=0; i < stats.length; i++) { //Cost this server has from RegionLoad longcost=0;
// for every region on this server get the rl for(int regionIndex:cluster.regionsPerServer[i]) { // regionLoad,默认会取最近15次的流量统计做平均值。15次由StochasticLoadBalancer的numRegionLoadsToRemember参数配置。 // hbase.master.balancer.stochastic.numRegionLoadsToRemember,这个RegionLoad的更新策略在下面分析。 Collection<BalancerRegionLoad> regionLoadList = cluster.regionLoads[regionIndex];
// Now if we found a region load get the type of cost that was requested. if (regionLoadList != null) { // 计算每个Region的Cost。核心代码在下面。 cost = (long) (cost + getRegionLoadCost(regionLoadList)); } }
// Add the total cost to the stats. // 计算每个RS的cost stats[i] = cost; }
// Now return the scaled cost from data held in the stats object. // 计算整个集群的cost,和上面RegionCount算法一致。 return costFromArray(stats); }
// 关注点:Write、Read等负载信息是如何统计出来的 privatesynchronizedvoidupdateRegionLoad() { // We create a new hashmap so that regions that are no longer there are removed. // However we temporarily need the old loads so we can use them to keep the rolling average. Map<String, Deque<BalancerRegionLoad>> oldLoads = loads; loads = newHashMap<>();