Sentinel源码解析入口类和SlotChain构建过程详解

作者:hsfxuebao 时间:2022-06-14 18:29:50 

1. 测试用例

我们以sentinel-demo中的sentinel-annotation-spring-aop为例,分析sentinel的源码。核心代码如下:

DemoController:

@RestController
public class DemoController {
   @Autowired
   private TestService service;
   @GetMapping("/foo")
   public String apiFoo(@RequestParam(required = false) Long t) throws Exception {
       if (t == null) {
           t = System.currentTimeMillis();
       }
       service.test();
       return service.hello(t);
   }
   @GetMapping("/baz/{name}")
   public String apiBaz(@PathVariable("name") String name) {
       return service.helloAnother(name);
   }
}

TestServiceImpl:

@Service
public class TestServiceImpl implements TestService {
   @Override
   @SentinelResource(value = "test", blockHandler = "handleException", blockHandlerClass = {ExceptionUtil.class})
   public void test() {
       System.out.println("Test");
   }
   @Override
   @SentinelResource(value = "hello", fallback = "helloFallback")
   public String hello(long s) {
       if (s < 0) {
           throw new IllegalArgumentException("invalid arg");
       }
       return String.format("Hello at %d", s);
   }
   @Override
   @SentinelResource(value = "helloAnother", defaultFallback = "defaultFallback",
       exceptionsToIgnore = {IllegalStateException.class})
   public String helloAnother(String name) {
       if (name == null || "bad".equals(name)) {
           throw new IllegalArgumentException("oops");
       }
       if ("foo".equals(name)) {
           throw new IllegalStateException("oops");
       }
       return "Hello, " + name;
   }
   public String helloFallback(long s, Throwable ex) {
       // Do some log here.
       ex.printStackTrace();
       return "Oops, error occurred at " + s;
   }
   public String defaultFallback() {
       System.out.println("Go to default fallback");
       return "default_fallback";
   }
}

启动类DemoApplication

@SpringBootApplication
public class DemoApplication {
   public static void main(String[] args) {
       SpringApplication.run(DemoApplication.class, args);
   }
}

在启动这个工程上增加参数:

-Dcsp.sentinel.dashboard.server=localhost:8081 -Dproject.name=annotation-aspectj

如图:

Sentinel源码解析入口类和SlotChain构建过程详解

打开http://localhost:8081/#/dashboard 地址,可以看到应用已经注册到sentinel管理后台:

Sentinel源码解析入口类和SlotChain构建过程详解

1.1 流控测试

访问 http://localhost:19966/foo?t=188 这个链接,多访问几次,在实时监控页面可以看到:

Sentinel源码解析入口类和SlotChain构建过程详解

然后,我们先简单配置一个流控规则,如下:

Sentinel源码解析入口类和SlotChain构建过程详解

其中,资源名为:

Sentinel源码解析入口类和SlotChain构建过程详解

然后我们在快速刷新http://localhost:19966/foo?t=188 接口,会出现限流的情况,返回如下:

Oops, error occurred at 188

实时监控为:

Sentinel源码解析入口类和SlotChain构建过程详解

2. 注解版源码分析

使用注解@SentinelResource 核心原理就是 利用AOP切入到方法中,我们直接看SentinelResourceAspect类,这是一个切面类:

@Aspect // 切面
public class SentinelResourceAspect extends AbstractSentinelAspectSupport {
   // 指定切入点为@SentinelResource 注解
   @Pointcut("@annotation(com.alibaba.csp.sentinel.annotation.SentinelResource)")
   public void sentinelResourceAnnotationPointcut() {
   }
   // 环绕通知
   @Around("sentinelResourceAnnotationPointcut()")
   public Object invokeResourceWithSentinel(ProceedingJoinPoint pjp) throws Throwable {
       Method originMethod = resolveMethod(pjp);
       SentinelResource annotation = originMethod.getAnnotation(SentinelResource.class);
       if (annotation == null) {
           // Should not go through here.
           throw new IllegalStateException("Wrong state for SentinelResource annotation");
       }
       String resourceName = getResourceName(annotation.value(), originMethod);
       EntryType entryType = annotation.entryType();
       int resourceType = annotation.resourceType();
       Entry entry = null;
       try {
           // 要织入的,增强的功能
           entry = SphU.entry(resourceName, resourceType, entryType, pjp.getArgs());
           // 调用目标方法
           return pjp.proceed();
       } catch (BlockException ex) {
           return handleBlockException(pjp, annotation, ex);
       } catch (Throwable ex) {
           Class<? extends Throwable>[] exceptionsToIgnore = annotation.exceptionsToIgnore();
           // The ignore list will be checked first.
           if (exceptionsToIgnore.length > 0 && exceptionBelongsTo(ex, exceptionsToIgnore)) {
               throw ex;
           }
           if (exceptionBelongsTo(ex, annotation.exceptionsToTrace())) {
               traceException(ex);
               return handleFallback(pjp, annotation, ex);
           }
           // No fallback function can handle the exception, so throw it out.
           throw ex;
       } finally {
           if (entry != null) {
               entry.exit(1, pjp.getArgs());
           }
       }
   }
}

核心方法SphU.entry():

public static Entry entry(String name, int resourceType, EntryType trafficType, Object[] args)
   throws BlockException {
   // 注意 第4个参数值为 1
   return Env.sph.entryWithType(name, resourceType, trafficType, 1, args);
}
@Override
public Entry entryWithType(String name, int resourceType, EntryType entryType, int count, Object[] args)
   throws BlockException {
   // count 参数:表示当前请求可以增加多少个计数
   // 注意 第5个参数为false
   return entryWithType(name, resourceType, entryType, count, false, args);
}
@Override
public Entry entryWithType(String name, int resourceType, EntryType entryType, int count, boolean prioritized,
                          Object[] args) throws BlockException {
   // 将信息封装为一个资源对象
   StringResourceWrapper resource = new StringResourceWrapper(name, entryType, resourceType);
   // 返回一个资源操作对象entry
   // prioritized 为true 表示当前访问必须等待"根据其优先级计算出的时间"后才通过
   // prioritized 为 false 则当前请求无需等待
   return entryWithPriority(resource, count, prioritized, args);
}

我们重点看一下CtSph#entryWithPriority

/**
* @param resourceWrapper
* @param count 默认为1
* @param prioritized 默认为false
* @param args
* @return
* @throws BlockException
*/
private Entry entryWithPriority(ResourceWrapper resourceWrapper, int count, boolean prioritized, Object... args)
   throws BlockException {
   // 从ThreadLocal中获取Context
   // 一个请求会占用一个线程,一个线程会绑定一个context
   Context context = ContextUtil.getContext();
   // 若context是 NullContext类型,则表示当前系统中的context数量已经超过阈值
   // 即访问的请求的数量已经超出了阈值,此时直接返回一个无需做规则检测的资源操作对象
   if (context instanceof NullContext) {
       // The {@link NullContext} indicates that the amount of context has exceeded the threshold,
       // so here init the entry only. No rule checking will be done.
       return new CtEntry(resourceWrapper, null, context);
   }
   // 当前线程中没有绑定context,则创建一个context并将其放入到Threadlocal
   if (context == null) {
       // todo Using default context.
       context = InternalContextUtil.internalEnter(Constants.CONTEXT_DEFAULT_NAME);
   }
   // Global switch is close, no rule checking will do.
   // 若全局开关是关闭的,直接返回一个无需做规则检测的资源操作对象
   if (!Constants.ON) {
       return new CtEntry(resourceWrapper, null, context);
   }
   // todo 查找SlotChain
   ProcessorSlot<Object> chain = lookProcessChain(resourceWrapper);
   /*
    * Means amount of resources (slot chain) exceeds {@link Constants.MAX_SLOT_CHAIN_SIZE},
    * so no rule checking will be done.
    */
   // 若没有知道chain,则意味着chain数量超出了阈值
   if (chain == null) {
       return new CtEntry(resourceWrapper, null, context);
   }
   // 创建一个资源操作对象
   Entry e = new CtEntry(resourceWrapper, chain, context);
   try {
       // todo 对资源进行操作
       chain.entry(context, resourceWrapper, null, count, prioritized, args);
   } catch (BlockException e1) {
       e.exit(count, args);
       throw e1;
   } catch (Throwable e1) {
       // This should not happen, unless there are errors existing in Sentinel internal.
       RecordLog.info("Sentinel unexpected exception", e1);
   }
   return e;
}

2.1 默认Context创建

当前线程没有绑定Context,则创建一个context并将其放入到Threadlocal。核心方法为 InternalContextUtil.internalEnter

public static Context enter(String name, String origin) {
   if (Constants.CONTEXT_DEFAULT_NAME.equals(name)) {
       throw new ContextNameDefineException(
           "The " + Constants.CONTEXT_DEFAULT_NAME + " can't be permit to defined!");
   }
   return trueEnter(name, origin);
}
protected static Context trueEnter(String name, String origin) {
   // 尝试从ThreadLocal中获取context
   Context context = contextHolder.get();
   // 若Threadlocal中没有,则尝试从缓存map中获取
   if (context == null) {
       // 缓存map的key为context名称,value为EntranceNode
       Map<String, DefaultNode> localCacheNameMap = contextNameNodeMap;
       // DCL 双重检测锁,防止并发创建对象
       DefaultNode node = localCacheNameMap.get(name);
       if (node == null) {
           // 若缓存map的size 大于 context数量的最大阈值,则直接返回NULL_CONTEXT
           if (localCacheNameMap.size() > Constants.MAX_CONTEXT_NAME_SIZE) {
               setNullContext();
               return NULL_CONTEXT;
           } else {
               LOCK.lock();
               try {
                   node = contextNameNodeMap.get(name);
                   if (node == null) {
                       if (contextNameNodeMap.size() > Constants.MAX_CONTEXT_NAME_SIZE) {
                           setNullContext();
                           return NULL_CONTEXT;
                       } else {
                           // 创建一个EntranceNode
                           node = new EntranceNode(new StringResourceWrapper(name, EntryType.IN), null);
                           // Add entrance node.
                           // 将新建的node添加到Root
                           Constants.ROOT.addChild(node);
                           // 将新建的node写入到缓存map
                           // 为了防止"迭代稳定性问题"-iterate stable 对于共享集合的写操作
                           Map<String, DefaultNode> newMap = new HashMap<>(contextNameNodeMap.size() + 1);
                           newMap.putAll(contextNameNodeMap);
                           newMap.put(name, node);
                           contextNameNodeMap = newMap;
                       }
                   }
               } finally {
                   LOCK.unlock();
               }
           }
       }
       // 将context的name与entranceNode 封装成context
       context = new Context(node, name);
       // 初始化context的来源
       context.setOrigin(origin);
       // 将context写入到ThreadLocal
       contextHolder.set(context);
   }
   return context;
}

注意:因为 private static volatile Map<String, DefaultNode> contextNameNodeMap = new HashMap<>();是 HashMap结构,所以存在并发安全问题,采用 代码中方式进行添加操作。

2.2 查找并创建SlotChain

构建调用链lookProcessChain(resourceWrapper)

ProcessorSlot<Object> lookProcessChain(ResourceWrapper resourceWrapper) {
   // 缓存map的key为资源 value为其相关的SlotChain
   ProcessorSlotChain chain = chainMap.get(resourceWrapper);
   // DCL
   // 若缓存中没有相关的SlotChain 则创建一个并放入到缓存中
   if (chain == null) {
       synchronized (LOCK) {
           chain = chainMap.get(resourceWrapper);
           if (chain == null) {
               // Entry size limit.
               // 缓存map的size 大于 chain数量的最大阈值,则直接返回null,不在创建新的chain
               if (chainMap.size() >= Constants.MAX_SLOT_CHAIN_SIZE) {
                   return null;
               }
               // todo 创建新的chain
               chain = SlotChainProvider.newSlotChain();
               // 防止 迭代稳定性问题
               Map<ResourceWrapper, ProcessorSlotChain> newMap = new HashMap<ResourceWrapper, ProcessorSlotChain>(
                   chainMap.size() + 1);
               newMap.putAll(chainMap);
               newMap.put(resourceWrapper, chain);
               chainMap = newMap;
           }
       }
   }
   return chain;
}

我们直接看核心方法SlotChainProvider.newSlotChain();

public static ProcessorSlotChain newSlotChain() {
       // 若builder不为null,则直接使用builder构建一个chain
       // 否则先创建一个builder
       if (slotChainBuilder != null) {
           return slotChainBuilder.build();
       }
       // Resolve the slot chain builder SPI.
       // 通过SPI方式创建builder
       slotChainBuilder = SpiLoader.of(SlotChainBuilder.class).loadFirstInstanceOrDefault();
       // 若通过SPI未能创建builder,则创建一个默认的DefaultSlotChainBuilder
       if (slotChainBuilder == null) {
           // Should not go through here.
           RecordLog.warn("[SlotChainProvider] Wrong state when resolving slot chain builder, using default");
           slotChainBuilder = new DefaultSlotChainBuilder();
       } else {
           RecordLog.info("[SlotChainProvider] Global slot chain builder resolved: {}",
               slotChainBuilder.getClass().getCanonicalName());
       }
       // todo 构建一个chain
       return slotChainBuilder.build();
   }
   private SlotChainProvider() {}
}

2.2.1 创建slotChainBuilder

// 通过SPI方式创建builder
slotChainBuilder = SpiLoader.of(SlotChainBuilder.class).loadFirstInstanceOrDefault();

通过SPI方法创建slotChainBuilder,去项目中META-INF.service中获取:

Sentinel源码解析入口类和SlotChain构建过程详解

2.2.2 slotChainBuilder.build()

@Spi(isDefault = true)
public class DefaultSlotChainBuilder implements SlotChainBuilder {
   @Override
   public ProcessorSlotChain build() {
       ProcessorSlotChain chain = new DefaultProcessorSlotChain();
       // 通过SPI方式构建Slot
       List<ProcessorSlot> sortedSlotList = SpiLoader.of(ProcessorSlot.class).loadInstanceListSorted();
       for (ProcessorSlot slot : sortedSlotList) {
           if (!(slot instanceof AbstractLinkedProcessorSlot)) {
               RecordLog.warn("The ProcessorSlot(" + slot.getClass().getCanonicalName() + ") is not an instance of AbstractLinkedProcessorSlot, can't be added into ProcessorSlotChain");
               continue;
           }
           chain.addLast((AbstractLinkedProcessorSlot<?>) slot);
       }
       return chain;
   }
}

通过SPI机制,去项目中META-INF.service中获取,在sentinel-core项目中:

Sentinel源码解析入口类和SlotChain构建过程详解

还有一个ParamFlowSlot,在sentinel-extension/sentinel-parameter-flow-control下:

Sentinel源码解析入口类和SlotChain构建过程详解

我们点击 NodeSelectorSlot, 类上面是有 优先级order,数字越小,优先级越高。

@Spi(isSingleton = false, order = Constants.ORDER_NODE_SELECTOR_SLOT)
public class NodeSelectorSlot extends AbstractLinkedProcessorSlot&lt;Object&gt; {

优先级常量为:

public static final int ORDER_NODE_SELECTOR_SLOT = -10000;
public static final int ORDER_CLUSTER_BUILDER_SLOT = -9000;
public static final int ORDER_LOG_SLOT = -8000;
public static final int ORDER_STATISTIC_SLOT = -7000;
public static final int ORDER_AUTHORITY_SLOT = -6000;
public static final int ORDER_SYSTEM_SLOT = -5000;
public static final int ORDER_FLOW_SLOT = -2000;
public static final int ORDER_DEGRADE_SLOT = -1000;

我们看代码中的变量sortedSlotList,已经按照优先级排序好了:

Sentinel源码解析入口类和SlotChain构建过程详解

我们看一下构建的ProcessorSlotChain,类似一个单链表结构,如下:

Sentinel源码解析入口类和SlotChain构建过程详解

我们看一下相关的类结构:DefaultProcessorSlotChain:

// 这是一个单向链表,默认包含一个接节点,且有两个指针first 和end同时指向这个节点
public class DefaultProcessorSlotChain extends ProcessorSlotChain {
   AbstractLinkedProcessorSlot<?> first = new AbstractLinkedProcessorSlot<Object>() {
       @Override
       public void entry(Context context, ResourceWrapper resourceWrapper, Object t, int count, boolean prioritized, Object... args)
           throws Throwable {
           super.fireEntry(context, resourceWrapper, t, count, prioritized, args);
       }
       @Override
       public void exit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) {
           super.fireExit(context, resourceWrapper, count, args);
       }
   };
   AbstractLinkedProcessorSlot<?> end = first;
   @Override
   public void addFirst(AbstractLinkedProcessorSlot<?> protocolProcessor) {
       protocolProcessor.setNext(first.getNext());
       first.setNext(protocolProcessor);
       if (end == first) {
           end = protocolProcessor;
       }
   }
   @Override
   public void addLast(AbstractLinkedProcessorSlot<?> protocolProcessor) {
       end.setNext(protocolProcessor);
       end = protocolProcessor;
   }
}

AbstractLinkedProcessorSlot:

public abstract class AbstractLinkedProcessorSlot<T> implements ProcessorSlot<T> {
   // 声明一个同类型的变量,则可以指向下一个Slot节点
   private AbstractLinkedProcessorSlot<?> next = null;
   @Override
   public void fireEntry(Context context, ResourceWrapper resourceWrapper, Object obj, int count, boolean prioritized, Object... args)
       throws Throwable {
       if (next != null) {
           next.transformEntry(context, resourceWrapper, obj, count, prioritized, args);
       }
   }
   @SuppressWarnings("unchecked")
   void transformEntry(Context context, ResourceWrapper resourceWrapper, Object o, int count, boolean prioritized, Object... args)
       throws Throwable {
       T t = (T)o;
       entry(context, resourceWrapper, t, count, prioritized, args);
   }
   @Override
   public void fireExit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) {
       if (next != null) {
           next.exit(context, resourceWrapper, count, args);
       }
   }
   public AbstractLinkedProcessorSlot<?> getNext() {
       return next;
   }
   public void setNext(AbstractLinkedProcessorSlot<?> next) {
       this.next = next;
   }
}

构建完成后的SlotChain和工作原理图一样:

Sentinel源码解析入口类和SlotChain构建过程详解

接下来,对资源进行操作的核心方法为chain.entry(context, resourceWrapper, null, count, prioritized, args);,这个我们下篇文章分析。

参考文章

Sentinel1.8.5源码github地址(注释)

Sentinel官网

来源:https://juejin.cn/post/7148760487574896654

标签:Sentinel,入口类,SlotChain,构建
0
投稿

猜你喜欢

  • C#组合函数的使用详解

    2022-01-24 04:22:41
  • SVN报错:Error Updating changes:svn:E155037的解决方案

    2023-06-11 07:27:11
  • java微信公众号支付示例详解

    2023-11-15 05:52:01
  • Unity实现跑马灯抽奖效果

    2022-10-09 04:09:54
  • C#读取word中表格数据的方法实现

    2023-09-12 22:54:53
  • 整理总结Java多线程程序编写的要点

    2022-02-19 16:17:23
  • Java实现提取QSV文件视频内容

    2023-08-24 13:33:45
  • SpringBoot框架RESTful接口设置跨域允许

    2021-12-31 13:40:19
  • C#引用类型转换的常见方式总结

    2022-03-02 16:53:58
  • C#使用SQL DataAdapter数据适配代码实例

    2021-11-29 11:51:15
  • SpringBoot 静态资源导入及首页设置问题

    2023-11-26 22:45:07
  • 使用Jacoco获取 Java 程序的代码执行覆盖率的步骤详解

    2022-07-22 00:25:13
  • C#深拷贝方法探究及性能比较(多种深拷贝)

    2022-08-30 18:17:02
  • 实现java简单的线程池

    2023-08-09 06:05:15
  • Java框架---Spring详解

    2021-07-09 14:27:30
  • 如何设计一个安全的API接口详解

    2023-03-06 14:57:03
  • java JUC信号量Semaphore原理及使用介绍

    2023-01-02 02:08:57
  • Flutter开发中的路由参数处理

    2023-06-21 04:27:48
  • java实现图书馆管理系统

    2023-12-10 15:50:29
  • java Socket实现网页版在线聊天

    2022-10-19 12:13:42
  • asp之家 软件编程 m.aspxhome.com