Sentinel source code analysis

edition:

  <groupId>com.alibaba.cloud</groupId>
  <artifactId>spring-cloud-starter-alibaba-sentinel</artifactId>
  <version>2.2.5.RELEASE</version>

In spring In factories:

org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
com.alibaba.cloud.sentinel.SentinelWebAutoConfiguration,\     #The reason why the controller layer can also be protected by resources
com.alibaba.cloud.sentinel.SentinelWebFluxAutoConfiguration,\
com.alibaba.cloud.sentinel.endpoint.SentinelEndpointAutoConfiguration,\
com.alibaba.cloud.sentinel.custom.SentinelAutoConfiguration,\   #Core configuration class
com.alibaba.cloud.sentinel.feign.SentinelFeignAutoConfiguration  #Feign integrates Sentinel

org.springframework.cloud.client.circuitbreaker.EnableCircuitBreaker=\
com.alibaba.cloud.sentinel.custom.SentinelCircuitBreakerConfiguration

Mainly see: SentinelAutoConfiguration, without configuring any parameters, takes effect by default.

There is an important bean, SentinelResourceAspect, which intercepts the resources identified by the @SentinelResource annotation.

 

 

The source code is not long. Post the code:

@Aspect
public class SentinelResourceAspect extends AbstractSentinelAspectSupport {
    
// Block resources identified by the @SentinelResource annotation @Pointcut(
"@annotation(com.alibaba.csp.sentinel.annotation.SentinelResource)") public void sentinelResourceAnnotationPointcut() { } @Around("sentinelResourceAnnotationPointcut()") public Object invokeResourceWithSentinel(ProceedingJoinPoint pjp) throws Throwable { Method originMethod = resolveMethod(pjp); SentinelResource annotation = originMethod.getAnnotation(SentinelResource.class); if (annotation == null) { // Should not go through here. throw new IllegalStateException("Wrong state for SentinelResource annotation"); } String resourceName = getResourceName(annotation.value(), originMethod); EntryType entryType = annotation.entryType(); int resourceType = annotation.resourceType(); Entry entry = null; try {
// Enable the configuration rules of resource protection, flow control, degradation, etc entry
= SphU.entry(resourceName, resourceType, entryType, pjp.getArgs());
// Business logic
return pjp.proceed(); } catch (BlockException ex) {
// Handle Block exceptions. All Sentinel rule exceptions go here
return handleBlockException(pjp, annotation, ex); } catch (Throwable ex) {
// Business exceptions go here Class
<? extends Throwable>[] exceptionsToIgnore = annotation.exceptionsToIgnore(); // The ignore list will be checked first. if (exceptionsToIgnore.length > 0 && exceptionBelongsTo(ex, exceptionsToIgnore)) { throw ex; } if (exceptionBelongsTo(ex, annotation.exceptionsToTrace())) { traceException(ex); return handleFallback(pjp, annotation, ex); } // No fallback function can handle the exception, so throw it out. throw ex; } finally { if (entry != null) {
// Execute exit logic entry.exit(
1, pjp.getArgs()); } } } }

 

Follow: sphu The logic of entry (resourcename, resourcetype, entrytype, pjp.getargs())

There are processorslot<object> chain = lookprocesschain (resourcewrapper) under ctsph\entrywithpriority; To build a Slotchain.

Sentinel abstracts each of the flow control rules, degradation rules, system rules, hotspot rules, and authorization rules configured on the page into a Slot, such as FlowSlot processing flow control rules, AuthoritySlot processing authorization rules, StatisticSlot statistical request QPS, number of successful requests, number of failed requests, and number of blocked requests,

Maximum response time, etc. The chain constructed above is to string these slot s through the responsibility chain mode, and then execute them.

Take a look:

 ProcessorSlot<Object> lookProcessChain(ResourceWrapper resourceWrapper) {
        ProcessorSlotChain chain = chainMap.get(resourceWrapper);
        if (chain == null) {
            synchronized (LOCK) {
// Each resource maintains a slotChain chain
= chainMap.get(resourceWrapper); if (chain == null) { // Entry size limit. if (chainMap.size() >= Constants.MAX_SLOT_CHAIN_SIZE) { return null; } // Build responsibility chain chain = SlotChainProvider.newSlotChain(); Map<ResourceWrapper, ProcessorSlotChain> newMap = new HashMap<ResourceWrapper, ProcessorSlotChain>( chainMap.size() + 1); newMap.putAll(chainMap); newMap.put(resourceWrapper, chain); chainMap = newMap; } } } return chain; }

 

Follow: defaultslotchainbuilder\build()

 public ProcessorSlotChain build() {
        ProcessorSlotChain chain = new DefaultProcessorSlotChain();
        //  Sentinel maintains a set of Spi mechanisms to load the implementation classes of the ProcessorSlot interface. Those regular slots will be sorted according to the order in the custom annotation @Spi on the slot
        List<ProcessorSlot> sortedSlotList = SpiLoader.of(ProcessorSlot.class).loadInstanceListSorted();
        for (ProcessorSlot slot : sortedSlotList) {
            if (!(slot instanceof AbstractLinkedProcessorSlot)) {
                RecordLog.warn("The ProcessorSlot(" + slot.getClass().getCanonicalName() + ") is not an instance of AbstractLinkedProcessorSlot, can't be added into ProcessorSlotChain");
                continue;
            }
              // Put the loaded slot into the responsibility chain
            chain.addLast((AbstractLinkedProcessorSlot<?>) slot);
        }

        return chain;
    }

 

Under the sentinel core package, there are corresponding files loaded by spi:

 

The ProcessorSlotChain finally constructed is the following chain:

Each slot implements the ProcessorSlot interface, and the specific content is in the entry method.

 NodeSelectorSlot:

Build the resource link, such as the tree structure under the cluster link in the console.

 ClusterBuilderSlot:

Building resource nodes in the cluster will be used in cluster current limiting.

 LogSlot:

Just keep some error logs.

     

 

 StatisticSlot:

This slot is more important. As can be seen from the name, it is used for statistics. For those qps, the number of successes, failures and blocking will be recorded@ Override

public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
                      boolean prioritized, Object... args) throws Throwable {
        try {
            // Do some checking. Execute next slot No previous operation
            fireEntry(context, resourceWrapper, node, count, prioritized, args);

            // Request passed, add thread count and pass count.
// After executing the slot behind the responsibility chain and before executing our business logic, we will add 1 to qps
// Number of threads requested plus 1
node.increaseThreadNum();
// The number of qps requests plus 1 is the number of bucket s in the sliding time window. This will be explained in detail later node.addPassRequest(count);
if (context.getCurEntry().getOriginNode() != null) { // Add count for origin node. context.getCurEntry().getOriginNode().increaseThreadNum(); context.getCurEntry().getOriginNode().addPassRequest(count); } if (resourceWrapper.getEntryType() == EntryType.IN) { // Add count for global inbound entry node for global statistics. Constants.ENTRY_NODE.increaseThreadNum(); Constants.ENTRY_NODE.addPassRequest(count); } // Handle pass event with registered entry callback handlers.
// You can extend and execute callBack by yourself
for (ProcessorSlotEntryCallback<DefaultNode> handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) { handler.onPass(context, resourceWrapper, node, count, args); } } catch (PriorityWaitException ex) { node.increaseThreadNum(); if (context.getCurEntry().getOriginNode() != null) { // Add count for origin node. context.getCurEntry().getOriginNode().increaseThreadNum(); } if (resourceWrapper.getEntryType() == EntryType.IN) { // Add count for global inbound entry node for global statistics. Constants.ENTRY_NODE.increaseThreadNum(); } // Handle pass event with registered entry callback handlers. for (ProcessorSlotEntryCallback<DefaultNode> handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) { handler.onPass(context, resourceWrapper, node, count, args); } } catch (BlockException e) {
// Handling of sentinel rule exceptions
// Blocked, set block exception to current entry. context.getCurEntry().setBlockError(e); // Add block count.
// Increase qps of block node.increaseBlockQps(count); if (context.getCurEntry().getOriginNode() != null) { context.getCurEntry().getOriginNode().increaseBlockQps(count); } if (resourceWrapper.getEntryType() == EntryType.IN) { // Add count for global inbound entry node for global statistics. Constants.ENTRY_NODE.increaseBlockQps(count); } // Handle block event with registered entry callback handlers. for (ProcessorSlotEntryCallback<DefaultNode> handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) { handler.onBlocked(e, context, resourceWrapper, node, count, args); } throw e; } catch (Throwable e) {
// Unexpected internal error, set error to current entry. context.getCurEntry().setError(e); throw e; } }

 

AuthoritySlot

Verification of authorization rules

@Override
    public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count, boolean prioritized, Object... args)
        throws Throwable {
// Black and white list rules for verifying authorization rules checkBlackWhiteAuthority(resourceWrapper, context); fireEntry(context, resourceWrapper, node, count, prioritized, args); }

 

SystemSlot

Execute system rules.

FlowSlot  

Execute flow restriction rules.

 @Override
    public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
                      boolean prioritized, Object... args) throws Throwable {
        checkFlow(resourceWrapper, context, node, count, prioritized);
         // Trigger the next slot
        fireEntry(context, resourceWrapper, node, count, prioritized, args);
    }
void checkFlow(ResourceWrapper resource, Context context, DefaultNode node, int count, boolean prioritized)
        throws BlockException {
        checker.checkFlow(ruleProvider, resource, context, node, count, prioritized);
    }

checker is an instance of FlowRuleChecker.

public void checkFlow(Function<String, Collection<FlowRule>> ruleProvider, ResourceWrapper resource,
                          Context context, DefaultNode node, int count, boolean prioritized) throws BlockException {
        if (ruleProvider == null || resource == null) {
            return;
        }
// Get all current limiting rules Collection
<FlowRule> rules = ruleProvider.apply(resource.getName()); if (rules != null) { for (FlowRule rule : rules) {
// Verify whether flow control passes
if (!canPassCheck(rule, context, node, count, prioritized)) { throw new FlowException(rule.getLimitApp(), rule); } } } }

canPassCheck will queue up and wait according to the fast failure selected in the flow control rule. The DefaultController, WarmUpRateLimiterController, and RateLimiterController classes are used to control respectively. The current limiting algorithms used are sliding time windows,

Token bucket, leaky bucket algorithm.

Finally, we will go to defaultcontroller\canpass, where the quick failure rule will go. The sliding time window is used.

 public boolean canPass(Node node, int acquireCount, boolean prioritized) {
// To get the qps that have passed from the sliding time window is to compare whether the traffic in the last 1s has reached the threshold
int curCount = avgUsedTokens(node);
// acquireCount is usually 1 to judge whether the current limit requirements are met
if (curCount + acquireCount > count) {
if (prioritized && grade == RuleConstant.FLOW_GRADE_QPS) {
// You don't usually walk here
long currentTime; long waitInMs; currentTime = TimeUtil.currentTimeMillis(); waitInMs = node.tryOccupyNext(currentTime, acquireCount, count); if (waitInMs < OccupyTimeoutProperty.getOccupyTimeout()) { node.addWaitingRequest(currentTime + waitInMs, acquireCount); node.addOccupiedPass(acquireCount); sleep(waitInMs); // PriorityWaitException indicates that the request will pass after waiting for {@link @waitInMs}. throw new PriorityWaitException(waitInMs); } } return false; } return true; }

DegradeSlot

Used by the fusing rules.

@Override
    public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
                      boolean prioritized, Object... args) throws Throwable {
        performChecking(context, resourceWrapper);

        fireEntry(context, resourceWrapper, node, count, prioritized, args);
    }

    void performChecking(Context context, ResourceWrapper r) throws BlockException {
// The circuit breaker is obtained according to the fuse rules configured on the console. There are two types of circuit breakers: ExceptionCircuitBreaker. It is used when the fuse strategy is: abnormal proportion and abnormal number.
// ResponseTimeCircuitBreaker: the fusing strategy is: used when calling the slow proportion List
<CircuitBreaker> circuitBreakers = DegradeRuleManager.getCircuitBreakers(r.getName()); if (circuitBreakers == null || circuitBreakers.isEmpty()) { return; } for (CircuitBreaker cb : circuitBreakers) { if (!cb.tryPass(context)) { throw new DegradeException(cb.getRule().getLimitApp(), cb.getRule()); } } }

The above important current limiting logic and degradation logic will be analyzed separately. Here, the overall process will be recorded first.

After the entry method on the responsibility chain is executed, if there is no BlockException exception, that is, the configured rules pass normally, the business logic will be executed. After the business logic is executed, the SentinelResourceAspect above

The finally method in the aspect will call the exit method on the responsibility chain.

 

In the exit method, we mainly do some data statistics. Flow control and degradation mentioned above are involved. This will be recorded when analyzing the flow control algorithm and degradation.

Sentinel's core process is the implementation of this responsibility chain. The verification of various rules is performed on this chain.

 

Tags: Microservices sentinel

Posted by padams on Thu, 02 Jun 2022 00:22:24 +0530