preface
Load balancing means that in a cluster, multiple data requests are distributed on different units for execution, mainly to improve the system fault tolerance and strengthen the system's ability to process data.
In Dubbo, a service call is a filter for invokers of all entity domains, and finally the Invoker of a specific call is selected. First, get the list of all invokers in the Directory, filter out the invokers that meet the rules through routing, and finally select the specific invokers through load balancing. Therefore, the Dubbo load balancing mechanism is to determine which provider's service is used for a service call.
Overall structure
The analysis portal for Dubbo load balancing is org Apache Dubbo Rpc Cluster Loadbalance Abstractloadbalance abstract class to view the inheritance relationship of this class.
This is inherited by RandomLoadBalance, leadactiveloadbalance, RoundRobinLoadBalance and ConsistentHashLoadBalance. These four classes are the implementation of the four load balancing algorithms provided in Dubbo.
name | explain |
---|---|
RandomLoadBalance | Random algorithm, set random probability according to weight |
LeastActiveLoadBalance | The least active number algorithm refers to the difference between the number of requests and the number of completions, so that the service with high execution efficiency can receive more requests |
RoundRobinLoadBalance | Weighted rotation training algorithm, set rotation training proportion according to weight |
ConsistentHashLoadBalance | Hash consistency algorithm: the same request parameters are assigned to the same provider |
The above are the four load balancing algorithms provided by Dubbo.
From the above figure, we can see that AbstractLoadBalance implements the LoadBalance interface and an SPI interface. It specifies that the default implementation is RandomLoadBalance random algorithm mechanism.
Abstract class AbstractLoadBalance implements the general logic of load balancing, and declares an abstract method for subclasses to implement their load balancing logic.
public abstract class AbstractLoadBalance implements LoadBalance { /** * * @param Running time (MS) * @param Warm up time (MS) * @param Invoker weight value to be calculated */ static int calculateWarmupWeight(int uptime, int warmup, int weight) { // Calculate the weight of warm-up period int ww = (int) ((float) uptime / ((float) warmup / (float) weight)); // The returned weight values range from 1 to weight return ww < 1 ? 1 : (ww > weight ? weight : ww); } @Override public <T> Invoker<T> select(List<Invoker<T>> invokers, URL url, Invocation invocation) { // Verify whether invokers is empty if (CollectionUtils.isEmpty(invokers)) { return null; } // When the load balancing process is reached and there is only one Invoker in the invokers, the Invoker is returned directly if (invokers.size() == 1) { return invokers.get(0); } // Complete specific implementation in different load balancing strategies return doSelect(invokers, url, invocation); } // Declare abstract methods and implement them in subclasses protected abstract <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation); protected int getWeight(Invoker<?> invoker, Invocation invocation) { // Get the weight value of the current Invoker configuration int weight = invoker.getUrl().getMethodParameter(invocation.getMethodName(), WEIGHT_KEY, DEFAULT_WEIGHT); if (weight > 0) { // Service start time long timestamp = invoker.getUrl().getParameter(REMOTE_TIMESTAMP_KEY, 0L); if (timestamp > 0L) { // Service run time int uptime = (int) (System.currentTimeMillis() - timestamp); // Service warm-up time, DEFAULT_WARMUP = 10 * 60 * 1000, preheat for ten minutes int warmup = invoker.getUrl().getParameter(WARMUP_KEY, DEFAULT_WARMUP); // If the service running time is less than the preheating time, recalculate the weight of the preheating period if (uptime > 0 && uptime < warmup) { weight = calculateWarmupWeight(uptime, warmup, weight); } } } // Ensure that the last returned weight value is not less than 0 return weight >= 0 ? weight : 0; } }
In AbstractLoadBalance, getWeight and calculateWarmupWeight methods are used to obtain and calculate the weight value of the current Invoker.
Get the current weight value in getWeight, and get the weight set by the current Invoker through the URL. If the current service provider's startup time is less than the warm-up time, the weight value will be recalculated, and the weight of the service will be reduced to ensure that the service can run healthily without distributing all the traffic of the set proportion at the initial stage of startup.
calculateWarmupWeight is the method to recalculate the weight value. The calculation formula is: service running time / (preheating time / set weight value), which is equivalent to the weight value set by (service running time / preheating time) *. At the same time, the conditional service running time < preheating time. It can be seen from this formula that the preheating duration and the set weight value remain unchanged. The longer the service running time is, the closer the calculated value is to weight, but it will not be equal to weight.
In the returned calculated weight results, the weight values less than 1 and greater than the setting are processed. When the recalculated weight is less than 1, 1 is returned; When it is between 1 and the set weight value, the calculated result is returned directly; When the weight is greater than the set weight value (this kind of situation will not occur due to condition restrictions), the set weight value is returned. Therefore, it is concluded that the recalculated weight value is 1 ~ the set weight value. The longer the running time is, the closer the calculated weight value is to the set weight value.
collocation method
Server
Configure via XML:
<!-- Service level configuration --> <dubbo:service id="xXXXService" interface="top.ytao.service.XXXXService" class="top.ytao.service.impl.XXXXServiceImpl" loadbalance="Load policy" /> <!-- Method level configuration --> <dubbo:service id="xXXXService" interface="top.ytao.service.XXXXService" class="top.ytao.service.impl.XXXXServiceImpl"> <dubbo:method name="Method name" loadbalance="Load policy"/> </dubbo:service>
Configure through Properties:
dubbo.service.loadbalance=Load policy
By annotation:
@Service(loadbalance = "Load policy")
client
Configure via XML:
<!-- Service level configuration --> <dubbo:reference id="xXXXService" interface="top.ytao.service.XXXXService" loadbalance="Load policy" /> <!-- Method level configuration --> <dubbo:reference id="xXXXService" interface="top.ytao.service.XXXXService"> <dubbo:method name="Method name" loadbalance="Load policy"/> </dubbo:reference>
Configure through Properties:
dubbo.reference.loadbalance=Load policy
Configure by annotation:
@Reference(loadbalance = "Load policy")
The implementation method can also be configured through the Dubbo admin management background, as shown in the figure:
Random algorithm
RandomLoadBalance is the default implementation of dubbo load balancing. It assigns the randomly selected proportion of each Invoker according to the weight. The meaning here is: sum the weights in the Invoker list that arrives at the load balancing process, and then calculate the proportion of a single Invoker weight in the total weight. The random number is generated within the range of the total weight value.
As shown in the figure, if there are currently 192.168.1.10 and 192.168.1.11 load balancing services with weights of 4 and 6 respectively, their selected proportions are 2/5 and 3/5.
When the generated random number is 6, the service of 192.168.1.11 will be selected.
doSelect implementation code of RandomLoadBalance in dubbo:
public class RandomLoadBalance extends AbstractLoadBalance { public static final String NAME = "random"; @Override protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) { // Number of invokers int length = invokers.size(); // Identify whether the weights of all invokers are the same boolean sameWeight = true; // Save the weight of each Invoker in an array int[] weights = new int[length]; // Weight of the first Invoker int firstWeight = getWeight(invokers.get(0), invocation); weights[0] = firstWeight; // Sum total weight int totalWeight = firstWeight; for (int i = 1; i < length; i++) { int weight = getWeight(invokers.get(i), invocation); // Save the weight of each Invoker to the total array weights[i] = weight; // Cumulative sum total weight totalWeight += weight; // If not all invokers have the same weight, mark sameWeight = false if (sameWeight && weight != firstWeight) { sameWeight = false; } } // Calculate the Invoker obtained from the random number, provided that the total weight is greater than 0, and the weight of each Invoker is different if (totalWeight > 0 && !sameWeight) { // Generate random number based on 0~ total number int offset = ThreadLocalRandom.current().nextInt(totalWeight); // Calculate the Invoker corresponding to the random number for (int i = 0; i < length; i++) { offset -= weights[i]; if (offset < 0) { return invokers.get(i); } } } // If the weights of all invokers are the same, one will be returned randomly from the Invoker list return invokers.get(ThreadLocalRandom.current().nextInt(length)); } }
The above is the implementation of weighted random strategy. Here, we mainly focus on the Invoker corresponding to the generated random number. By traversing the weight array, the generated number is subtracted from the current weight value. When offset is 0, it means that offset corresponds to the current Invoker service.
Take the generated random number of 6 as an example, traverse the Invokers length:
-
The first round: offset = 6 - 4 = 2 does not satisfy offset < 0. Continue to traverse.
-
The second round: offset = 2 - 6 = -4 satisfies offset < 0, and returns the Invoker corresponding to the current index. Because offset returns a negative number, it means that offset falls within the current Invoker weight range.
The weighted random strategy is not necessarily selected in proportion. The more theoretical calls, the closer the proportion of distribution to the proportion of weight.
Least active number algorithm
The LeastActiveLoadBalance policy is selected from the invokers of the minimum active number. What is active number? The active number is the number of requests being processed by an Invoker. When an Invoker starts processing requests, it will increase the active number by 1. After the request processing is completed, it will decrease the active number of the corresponding Invoker by 1. After finding out the minimum active number, the final Invoker is selected according to the weight. If the minimum number of activities finally found is the same, an Invoker will be randomly selected.
public class LeastActiveLoadBalance extends AbstractLoadBalance { public static final String NAME = "leastactive"; @Override protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) { // Number of invokers int length = invokers.size(); // The minimum active value in all invokers is -1 int leastActive = -1; // Number of minimum active value invokers int leastCount = 0; // The subscript position of the minimum active value Invoker in the Invokers list int[] leastIndexes = new int[length]; // Save the weight of each Invoker int[] weights = new int[length]; // Total weight int totalWeight = 0; // Weight of the first minimum active number int firstWeight = 0; // Whether the weight of the minimum active number Invoker list is the same boolean sameWeight = true; // Find the subscript of the minimum active number Invoker for (int i = 0; i < length; i++) { Invoker<T> invoker = invokers.get(i); // Get minimum active number int active = RpcStatus.getStatus(invoker.getUrl(), invocation.getMethodName()).getActive(); // Get weight int afterWarmup = getWeight(invoker, invocation); // Save weights weights[i] = afterWarmup; // If the current minimum active number is -1 (-1 is the minimum value) or less than leastActive if (leastActive == -1 || active < leastActive) { // Reset minimum active count leastActive = active; // Reset the number of minimum active invokers leastCount = 1; // Save the index of the current Invoker in the Invokers list to the leastIndexes array leastIndexes[0] = i; // Reset the total weight value of the minimum active number invoker totalWeight = afterWarmup; // Record the current Invoker weight as the weight of the first minimum active Invoker firstWeight = afterWarmup; // Since the current Invoker is reset to the first minimum active number Invoker, the value indicating that the weights of all minimum active number invokers are the same is true sameWeight = true; // If the current minimum active number is equal to the declared minimum active number } else if (active == leastActive) { // Record the current Invoker location leastIndexes[leastCount++] = i; // Add the current Invoker weight to the total weight totalWeight += afterWarmup; // If the current weight is not equal to firstWeight, change sameWeight to false if (sameWeight && i > 0 && afterWarmup != firstWeight) { sameWeight = false; } } } // If there is only one minimum active Invoker, the Invoker is returned directly if (leastCount == 1) { return invokers.get(leastIndexes[0]); } if (!sameWeight && totalWeight > 0) { // Randomly select one from the minimum active number Invoker list according to the weight int offsetWeight = ThreadLocalRandom.current().nextInt(totalWeight); for (int i = 0; i < leastCount; i++) { int leastIndex = leastIndexes[i]; offsetWeight -= weights[leastIndex]; if (offsetWeight < 0) { return invokers.get(leastIndex); } } } // If the weights of all invokers are the same, one will be returned randomly from the Invoker list return invokers.get(leastIndexes[ThreadLocalRandom.current().nextInt(leastCount)]); } }
The whole logic of this code is to filter the Invoker with the minimum active number from the Invokers list, and then select the final Invoker service in a strategy similar to the weighted random algorithm.
polling algorithm
The RoundRobinLoadBalance policy determines the proportion of polling based on the weight. Normal polling will distribute requests evenly among each node, but it can not adjust the request processing of servers with different performance. Therefore, weighted load balancing is used to allocate the corresponding request proportion to each server in the polling mechanism according to the weight.
public class RoundRobinLoadBalance extends AbstractLoadBalance { public static final String NAME = "roundrobin"; private static final int RECYCLE_PERIOD = 60000; protected static class WeightedRoundRobin { private int weight; private AtomicLong current = new AtomicLong(0); private long lastUpdate; public int getWeight() { return weight; } public void setWeight(int weight) { this.weight = weight; current.set(0); } public long increaseCurrent() { return current.addAndGet(weight); } public void sel(int total) { current.addAndGet(-1 * total); } public long getLastUpdate() { return lastUpdate; } public void setLastUpdate(long lastUpdate) { this.lastUpdate = lastUpdate; } } private ConcurrentMap<String, ConcurrentMap<String, WeightedRoundRobin>> methodWeightMap = new ConcurrentHashMap<String, ConcurrentMap<String, WeightedRoundRobin>>(); private AtomicBoolean updateLock = new AtomicBoolean(); /** * get invoker addr list cached for specified invocation * <p> * <b>for unit test only</b> * * @param invokers * @param invocation * @return */ protected <T> Collection<String> getInvokerAddrList(List<Invoker<T>> invokers, Invocation invocation) { String key = invokers.get(0).getUrl().getServiceKey() + "." + invocation.getMethodName(); Map<String, WeightedRoundRobin> map = methodWeightMap.get(key); if (map != null) { return map.keySet(); } return null; } @Override protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) { // key is interface name + method name String key = invokers.get(0).getUrl().getServiceKey() + "." + invocation.getMethodName(); // Check whether the corresponding service interface information exists in the cache. If not, add a new element to the cache ConcurrentMap<String, WeightedRoundRobin> map = methodWeightMap.get(key); if (map == null) { methodWeightMap.putIfAbsent(key, new ConcurrentHashMap<String, WeightedRoundRobin>()); map = methodWeightMap.get(key); } // Total weight int totalWeight = 0; long maxCurrent = Long.MIN_VALUE; // Current timestamp long now = System.currentTimeMillis(); // Invoker of maximum current Invoker<T> selectedInvoker = null; // Save the selected WeightedRoundRobin object WeightedRoundRobin selectedWRR = null; // Traverse the Invokers list for (Invoker<T> invoker : invokers) { // Get WeightedRoundRobin object from cache String identifyString = invoker.getUrl().toIdentityString(); WeightedRoundRobin weightedRoundRobin = map.get(identifyString); // Get the current Invoker object int weight = getWeight(invoker, invocation); // If the current Invoker does not have a corresponding WeightedRoundRobin object, a new one will be added if (weightedRoundRobin == null) { weightedRoundRobin = new WeightedRoundRobin(); weightedRoundRobin.setWeight(weight); map.putIfAbsent(identifyString, weightedRoundRobin); } // If the current Invoker weight is not equal to the weight in the corresponding WeightedRoundRobin object, reset the current weight to the corresponding WeightedRoundRobin object if (weight != weightedRoundRobin.getWeight()) { weightedRoundRobin.setWeight(weight); } // Accumulate weights into current long cur = weightedRoundRobin.increaseCurrent(); // Set the last update time of the weightedRoundRobin object weightedRoundRobin.setLastUpdate(now); // Invoker of the maximum current and assigned to the corresponding variable if (cur > maxCurrent) { maxCurrent = cur; selectedInvoker = invoker; selectedWRR = weightedRoundRobin; } // Add weight to total weight totalWeight += weight; } // If the number in the Invokers list is not equal to the number in the cache map if (!updateLock.get() && invokers.size() != map.size()) { if (updateLock.compareAndSet(false, true)) { try { // Copy map to newMap ConcurrentMap<String, WeightedRoundRobin> newMap = new ConcurrentHashMap<String, WeightedRoundRobin>(); newMap.putAll(map); // newMap to Iterator Iterator<Entry<String, WeightedRoundRobin>> it = newMap.entrySet().iterator(); // Circularly delete the cache that has not been updated for a set length of time while (it.hasNext()) { Entry<String, WeightedRoundRobin> item = it.next(); if (now - item.getValue().getLastUpdate() > RECYCLE_PERIOD) { it.remove(); } } // Put the current newMap service into the cache methodWeightMap.put(key, newMap); } finally { updateLock.set(false); } } } // If there is a selected Invoker if (selectedInvoker != null) { // Calculate current = current - totalWeight selectedWRR.sel(totalWeight); return selectedInvoker; } // Normally, it will not arrive here return invokers.get(0); } }
The logic of the Invoker selected above is: each Invoker has a current value, and the initial value is its own weight. In each Invoker, current = current + weight. After traversing the Invoker, the largest Invoker in the current is the selected Invoker. After Invoker is selected, current = current - totalWeight will be calculated for the current value.
The weights of the above 192.168.1.10 and 192.168.1.11 load balancing services are 4 and 6 respectively. Based on the calculation formula of current = current + weight before selection and current = current - totalWeight after selection, the following is obtained
Number of requests | current before selection | current after selection | Selected service |
---|---|---|---|
1 | [4, 6] | [4, -4] | 192.168.1.11 |
2 | [8, 2] | [-2, 2] | 192.168.1.10 |
3 | [2, 8] | [2, -2] | 192.168.1.11 |
4 | [6, 4] | [-4, 4] | 192.168.1.10 |
5 | [0, 10] | [0, 0] | 192.168.1.11 |
Consistent Hash algorithm
The ConsistentHashLoadBalance policy is to allocate requests with the same parameters to the same machine. Each service node is distributed in a ring, and the requests are also distributed in the ring. Find and replace the first service node clockwise at the position of the request on the ring. As shown in the figure:
At the same time, in order to avoid uneven request hash, each Invoker will be virtualized to multiple nodes in dubbo to make the request calls more uniform.
The consistency Hash modification configuration is as follows:
<!-- dubbo Only the first parameter is selected by default hash Identification, specifying hash parameter --> <dubbo:parameter key="hash.arguments" value="1" /> <!-- Number of virtual nodes --> <dubbo:parameter key="hash.nodes" value="200" />
Consistency Hash is implemented as follows:
public class ConsistentHashLoadBalance extends AbstractLoadBalance { public static final String NAME = "consistenthash"; /** * Hash nodes name */ public static final String HASH_NODES = "hash.nodes"; /** * Hash arguments name */ public static final String HASH_ARGUMENTS = "hash.arguments"; private final ConcurrentMap<String, ConsistentHashSelector<?>> selectors = new ConcurrentHashMap<String, ConsistentHashSelector<?>>(); @SuppressWarnings("unchecked") @Override protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) { // Get the requested method name String methodName = RpcUtils.getMethodName(invocation); // key = interface name + method name String key = invokers.get(0).getUrl().getServiceKey() + "." + methodName; // hashcode of invokers int identityHashCode = System.identityHashCode(invokers); // Check whether the data corresponding to the key exists in the cache or whether the Invokers list has changed. If not, it will be added to the cache and the Invoker obtained from load balancing will be returned ConsistentHashSelector<T> selector = (ConsistentHashSelector<T>) selectors.get(key); if (selector == null || selector.identityHashCode != identityHashCode) { selectors.put(key, new ConsistentHashSelector<T>(invokers, methodName, identityHashCode)); selector = (ConsistentHashSelector<T>) selectors.get(key); } return selector.select(invocation); } // ConsistentHashSelector class ... }
doSelect mainly implements cache checking and Invokers change checking. The implementation of consistent hash load balancing is implemented in this internal class, ConsistentHashSelector.
private static final class ConsistentHashSelector<T> { // Storage virtual node private final TreeMap<Long, Invoker<T>> virtualInvokers; // Number of nodes private final int replicaNumber; // The hashcode of the Invoker list is used to determine whether the Invoker list has changed private final int identityHashCode; // Index of the parameters used for Hash mapping in the request private final int[] argumentIndex; ConsistentHashSelector(List<Invoker<T>> invokers, String methodName, int identityHashCode) { this.virtualInvokers = new TreeMap<Long, Invoker<T>>(); this.identityHashCode = identityHashCode; URL url = invokers.get(0).getUrl(); // Get the number of nodes this.replicaNumber = url.getMethodParameter(methodName, HASH_NODES, 160); // Get parameter index in configuration String[] index = COMMA_SPLIT_PATTERN.split(url.getMethodParameter(methodName, HASH_ARGUMENTS, "0")); argumentIndex = new int[index.length]; for (int i = 0; i < index.length; i++) { argumentIndex[i] = Integer.parseInt(index[i]); } for (Invoker<T> invoker : invokers) { // Get the address in the Invoker, including the port number String address = invoker.getUrl().getAddress(); // Create virtual node for (int i = 0; i < replicaNumber / 4; i++) { byte[] digest = md5(address + i); for (int h = 0; h < 4; h++) { long m = hash(digest, h); virtualInvokers.put(m, invoker); } } } } // Find Invoker public Invoker<T> select(Invocation invocation) { // Convert parameter to string String key = toKey(invocation.getArguments()); // String parameter converted to md5 byte[] digest = md5(key); // Find the Invoker according to md5 return selectForKey(hash(digest, 0)); } // Concatenate parameters into strings private String toKey(Object[] args) { StringBuilder buf = new StringBuilder(); for (int i : argumentIndex) { if (i >= 0 && i < args.length) { buf.append(args[i]); } } return buf.toString(); } // Use md5 to match to the corresponding Invoker private Invoker<T> selectForKey(long hash) { // Find the first Invoker larger than the current hash Map.Entry<Long, Invoker<T>> entry = virtualInvokers.ceilingEntry(hash); if (entry == null) { entry = virtualInvokers.firstEntry(); } return entry.getValue(); } // hash operation private long hash(byte[] digest, int number) { return (((long) (digest[3 + number * 4] & 0xFF) << 24) | ((long) (digest[2 + number * 4] & 0xFF) << 16) | ((long) (digest[1 + number * 4] & 0xFF) << 8) | (digest[number * 4] & 0xFF)) & 0xFFFFFFFFL; } // md5 operation private byte[] md5(String value) { MessageDigest md5; try { md5 = MessageDigest.getInstance("MD5"); } catch (NoSuchAlgorithmException e) { throw new IllegalStateException(e.getMessage(), e); } md5.reset(); byte[] bytes = value.getBytes(StandardCharsets.UTF_8); md5.update(bytes); return md5.digest(); } }
The consistent hash implementation process is to create a virtual node first, and the virtual node is saved in the TreeMap. The key of the TreeMap is the configured parameter. First, perform md5 operation, and then perform hash operation on the md5 value. The value of the TreeMap is the selected Invoker.
At the last request, calculate the hash value of the parameter to obtain the Invoker from the TreeMap.
summary
The implementation of Dubbo load balancing is elegant in skills. You can learn more about its coding thinking. When studying its code, we need to carefully study its implementation principle, otherwise it is difficult to understand its idea.
Recommended reading
Implementation of Dubbo routing mechanism
Dubbo extension point loading mechanism: from Java SPI to Dubbo SPI
Dubbo's principle of service consumption
JDK dynamic proxy and CGLIB dynamic proxy that you must know