Sentinel基本使用与源码分析 天天亮点
系列文章目录和关于我
(相关资料图)
一丶什么是SentinelSentinel官网
Sentinel 是面向分布式、多语言异构化服务架构的流量治理组件,主要以流量为切入点
,从流量路由
、流量控制
、流量整形
、熔断降级
、系统自适应过载保护
、热点流量防护
等多个维度来帮助开发者保障微服务的稳定性。
流量整形:限制流出某一网络的某一连接的流量与突发,使这类报文以比较均匀的速度向外发送。流量整形通常使用缓冲区和令牌桶来完成,当报文的发送速度过快时,首先在缓冲区进行缓存,在令牌桶的控制下再均匀地发送这些被缓冲的报文
二丶主要功能1.流量控制任意时间到来的请求往往是随机不可控的,而系统的处理能力是有限的。我们需要根据系统的处理能力对流量进行控制
流量控制有以下几个角度:
资源的调用关系,例如资源的调用链路,资源和资源之间的关系;运行指标,例如 QPS、线程池、系统负载等;控制的效果,例如直接限流、冷启动、排队等。2.熔断降级当调用链路中某个资源出现不稳定,例如,表现为 timeout,异常比例升高的时候,则对这个资源的调用进行限制,并让请求快速失败,避免影响到其它的资源,最终产生雪崩的效果。
为何发生雪崩:如下图中服务D奄奄一息,其他服务对它具备依赖,对服务D发起调用的时候常常超时,RT很大,导致服务G线程都block在调用服务D的这一步中,导致服务G也奄奄一息,久而久之其他服务都处于不可服务的状态。
3.系统负载保护当系统负载较高的时候,如果还持续让请求进入,可能会导致系统崩溃,无法响应。Sentinel 提供了对应的保护机制,让系统的入口流量和系统的负载达到一个平衡,保证系统在能力范围之内处理最多的请求。
三丶基本使用1.sentinel依赖引入 com.alibaba.csp sentinel-core 版本号
2.初始化限流规则private static void initFlowQpsRule() { //限流规则列表 List rules = new ArrayList(); //第一个规则 FlowRule rule1 = new FlowRule(); //rule1针对什么资源(资源:指你要对什么限流,一般是方法名称) rule1.setResource(KEY); // 设置限流阈值为20 rule1.setCount(20); //限流策略: // 0:线程数(那么就是20个线程并发访问的时候限流) //1:qps:每秒访问资源的数量,超过20进行限流 rule1.setGrade(RuleConstant.FLOW_GRADE_QPS); //将受来源限制的应用程序名称 rule1.setLimitApp("default"); rules.add(rule1); FlowRuleManager.loadRules(rules); }
3.限流使用Entry entry = null;try { entry= SphU.entry("资源名称"); //执行正常业务逻辑} catch (BlockException e) { //执行被限流后的业务逻辑} finally { if (entry != null) { entry.close(); }}
4.控制台设置限流规则5.控制台设置降级规则四丶基本原理1.基本概念Entry
在Sentinel中,所有的资源都对应一个资源名称以及一个 Entry。
Entry负责记录当前调用的信息:
创建时间:用于统计rt当前节点Node:当前上下文中资源的统计信息(Node是Sentinel中的一个接口,负责记录实时统计信息)。来源Node:调用来源方信息。Slot
每一个 Entry 创建的时候,同时也会创建一系列功能插槽(slot chain)。这些插槽有不同的职责。
2.原理图五丶源码学习Sentinel不仅支持单机流控,还支持集群流控制,个人认为在流量分配均匀的情况下,单机流控完全够用了,并且集群流控需要额外的资源来进行集群服务器信息同步,感觉用处不是很大。
0.Sentinel SPI一个框架需要考虑到扩展性,实现扩展性的一个很好的方式就是SPI(Service Provider Interface)SPI可与实现将装配的控制权移到程序之外,实现使用方和提供方的解耦。
Sentinel提供了SpiLoader方便进行SPI服务实现的加载,Sentinel中很多核心组件都依赖此类进行加载。常见使用如下
SpiLoader.of(xxx.class).loadInstanceListSorted()
0.1 of 方法创建SpiLoader实例这种double check + synchronized实现线程安全的方式在Sentinel中非常常见。
0.2 SpiLoader对象加载服务提供者SpiLoader会使用ClassLoader读取META-INF/services/服务提供者class全限定类名
文件中的数据,并解析@Spi注解中的内容,决定是否单例,是否默认实现,以及顺序等内容,加载服务实现并返回结果。
FlowRuleManager内部使用map来存储管理资源和对应的流控规则(一个资源可存在多个流控规则)。
另外还提供PropertyListener来实现FlowRule变化后的回调。
2.SphU.entry 进行流控2.1 Env 调用InitFunc#init这里会使用Env中的static final单例CtSph对象进行流控规则,并且会进行InitFunc#init的调用
2.2 包装资源为StringResourceWrapperResourceWrapper是对资源的包装,存在两个实现,MethodResourceWrapper在调用SphU#entry(Method method)的时候使用到。
上面原理图中提到,Sentinel具备slot链条,在获取链条的时候,会根据ResourceWrapper从缓存map中获取,hash的规则是ResourceWrapper的名称,MethodResourceWrapper的名称是:方法定义类全限定名称:方法名称(参数类型全限定类名)
。
Context保存当前调用元数据,首先会通过ContextUtil从ThreadLocal中获取,如果不存在会new一个,并且设置到ThreadLocal中,随着Entry#close方法的时候会进行资源释放。
构建Context的时候,会设置其中的Node(名称为sentinel_default_context的EntranceNode),
Node在Sentinel中用来生成树状结构,描述方法的调用
2.3.2 lookProcessChain构建Slot执行链条slot链条式sentinel实现流量统计,和限流的核心,获取slot链条ProcessorSlot的代码如下
从缓存map中获取SPI获取SlotChainBuilder
这里是一个扩展点,我们可与实现基于配置中心的chainBuilder,实现slot链条配置化
DefaultSlotChainBuilder 构造ProcessorSlotChain
ProcessorSlotChain也是一个ProcessorSlot处理器插槽,内部使用net属性串联下一个AbstractLinkedProcessorSlot。
我们可扩展自己的ProcessorSlot,使用SPI机制轻松加入到ProcessorSlotChain中。
2.3.2 依次执行ProcessorSlot#entry1.NodeSelectorSlotNodeSelectorSlot 负责收集资源的路径,并将这些资源的调用路径,以树状结构存储起来,用于根据调用路径来限流降级
ClusterBuilderSlot会生成用于存储资源的统计信息以及调用者信息的ClusterNode,ClusterNode会负责存储该资源的 RT, QPS, thread count 等等,这些信息将用作为多维度限流,降级的依据。
3.StatisticSlot用于记录、统计不同纬度的 runtime 指标监控信息
统计请求通过数量使用了StatisticNode#addPassRequest方法
最终都是使用ArrayMetric进行记录, ArrayMetric 内部使用OccupiableBucketLeapArray或者BucketLeapArray进行计数,具体如何记录在后续章节中分析。
4.AuthoritySlot基于白名单黑名单逻辑的权限校验,默认情况是没有启用的。
5.SystemSlot通过系统的状态,例如 load,cpu使用率等,来控制总的入口流量。qps使用的是滑动窗口算法进行统计,load,cpu使用率这些指标通过com.sun.management.OperatingSystemMXBean获取。
6.FlowSlot根据预设的限流规则以及前面 slot 统计的状态,来进行流量控制。
可看到,每一个配置的FlowRule,都会调用canPassCheck检查,只要存在任何一个不满足要求,都会抛出FlowException
这里会根据FlowRule#setControlBehavior的不同选择不同的TrafficShapingController进行校验,也可以通过FlowRule#setRater直接指定的实现。
DefaultController :默认流量整形控制器,超过任何规则的阈值后,新的请求就会立即拒绝,拒绝方式为抛出FlowException
这里代码逻辑较为简单,获取当前qps或者线程数,如果acquireCount加上当前qps大于count(阈值)那么返回false。
另外可以看到Sentinel提供了预占能力。
实现的难点在于怎么统计qps,统计线程数,这部分在后续章节中进行学习。
ThrottlingController :均速排队,严格控制请求通过的时间间隔,也即是让请求以均匀的速度通过,对应的是漏桶算法。下面是判断是否通过的代码:
private boolean checkPassUsingCachedMs(int acquireCount, double maxCountPerStat) { //当前时间 long currentTime = TimeUtil.currentTimeMillis(); //statDurationMs = 1000ms 即1s(假设qps限制为20,maxCountPerStat=20) //1000ms * 1(请求数量)/ maxCountPerStat = 产生1个令牌所需要耗费的时间 long costTime = Math.round(1.0d * statDurationMs * acquireCount / maxCountPerStat); // costTime + 上一个请求通过的时间 = 什么时间,此令牌可以产生 long expectedTime = costTime + latestPassedTime.get(); //如果当前时间小于 满足令牌要求的时间(expectedTime) if (expectedTime <= currentTime) { // Contention may exist here, but it"s okay. //设置最后通过请求的时间(latestPassedTime是AtomicLong) latestPassedTime.set(currentTime); return true; } else { // 计算等待的时间 long waitTime = costTime + latestPassedTime.get() - TimeUtil.currentTimeMillis(); //等待的时间超过了排队等待的阈值(默认情况下500ms)那么返回false if (waitTime > maxQueueingTimeMs) { return false; } //上一次请求通过的时间,加上产生令牌需要的时间(addAndGet是自旋+cas操作) long oldTime = latestPassedTime.addAndGet(costTime); //等待时间 waitTime = oldTime - TimeUtil.currentTimeMillis(); //如果超过了等待阈值 那么cas减小时间,相当于回滚操作 if (waitTime > maxQueueingTimeMs) { latestPassedTime.addAndGet(-costTime); return false; } // sleep当前线程进行等待 if (waitTime > 0) { sleepMs(waitTime); } return true; }}
其实这个代码也不是天衣无缝:
这段代码也有很妙的地方:
WarmUpRateLimiterController/WarmUpController: 预热/冷启动方式。当系统长期处理低水平的情况下,当流量突然增加时,直接把系统拉升到高水位可能瞬间把系统压垮。通过"冷启动",让通过的流量缓慢增加,在一定时间内逐渐增加到阈值的上限,给系统一个预热的时间,避免冷系统被压垮。
这部分源码设计到guava的预热算法,后续了解学习
7.DegradeSlot&DefaultCircuitBreakerSlot二者都是熔断器插槽,并且短路原理一致,DegradeSlot在用户个资源指定DegradeRule(降级规则)的时候会根据DegradeRule构造出断路器CircuitBreaker,而DefaultCircuitBreakerSlot则是用户配置同一的短路规则,并没有给资源指定特定规则的时候,会使用默认规则生成CircuitBreaker。
DegradeRule分为三种:
RuleConstant.DEGRADE_GRADE_RT根据rt来进行熔断,对应ResponseTimeCircuitBreaker(响应时间断路器)
RuleConstant.DEGRADE_GRADE_EXCEPTION_RATIO根据异常比列进行熔断,对应ExceptionCircuitBreaker
RuleConstant.DEGRADE_GRADE_EXCEPTION_COUNT根据错误进行熔断,对应ExceptionCircuitBreaker
统计rt,错误次数,错误率都是基于LeapArray(滑动窗口算法)进行统计。
在DegradeSlot&DefaultCircuitBreakerSlot的entry方法(限流操作会调用到此方法)此方法会轮流调用CircuitBreaker#tryPass进行短路校验。
这里都会使用AbstractCircuitBreaker#tryPass
CircuitBreaker具备三种状态:
Open,断路器打开,此时会判断是否大于接口恢复时间,如果大于那么修改为半开,让一个请求先试试水,如果成功那么从半开修改为开。Close:断路器关闭了,此时所有请求都可以通过。并且根据指标(rt,错误率,错误次数)来决定是否将断路器修改为开Half open,半开状态,此时除了试水的线程可以通过,其他线程都会抛出DegradeException,试水线程会根据是否调用异常,来决定修改为开还是关可以看出断路器最重要的是维护这三种状态,并且状态的切换需要保证线程安全
如果断路器处于Open,首先会通过retryTimeoutArrived方法判断当前时间是否大于恢复时间点,如果不是,说明短路了返回false,DegradeSlot会抛出DegradeException,阻止调用。如果大于恢复时间点,会调用fromOpenToHalfOpen 尝试cas open->half_open,并且注册回调,此回调会在 entry.close()的时候被触发。
在DegradeSlot&DefaultCircuitBreakerSlot的exit方法中,会触发circuitBreaker#onRequestComplete(如果执行顺利没有出现BlockException异常的话)
这里会根据DegradeRule调用不同的CircuitBreaker
ExceptionCircuitBreaker
从Entry当中拿到执行的错误,如果具备错误,那么更新滑动窗口中的错误数
private void handleStateChangeWhenThresholdExceeded(Throwable error) { //当前是开,那么什么也不做,降级slot在开的状态会判断当前时间和恢复时间,实现降级效果,so,这里不需要做什么 if (currentState.get() == State.OPEN) { return; } //半开 if (currentState.get() == State.HALF_OPEN) { // 半开但是执行过程中无异常,那么调整为close,说明被调用方法接口已经稳定了,可以修改为close if (error == null) { fromHalfOpenToClose(); } else { //如果具备异常,那么修改为开,并且更新接口恢复时间,实现熔断 fromHalfOpenToOpen(1.0d); } return; } //开状态,滑动窗口统计错误数,和总数 List counters = stat.values(); long errCount = 0; long totalCount = 0; //统计错误数,和总数 for (SimpleErrorCounter counter : counters) { errCount += counter.errorCount.sum(); totalCount += counter.totalCount.sum(); } //小于 最小请求数(默认5)认为样本太少,直接啥也不做 if (totalCount < minRequestAmount) { return; } //默认根据错误次数,curCount记录错误次数 double curCount = errCount; //根据错误比例,curCount记录错误率 if (strategy == DEGRADE_GRADE_EXCEPTION_RATIO) { // Use errorRatio curCount = errCount * 1.0d / totalCount; } //错误比例 or 错误率超过了阈值,那么调整为开,并且更新接口恢复时间,实现熔断 if (curCount > threshold) { transformToOpen(curCount); }}
ResponseTimeCircuitBreaker
和上面类似,只不过是根据rt来判断,而不是错误次数,错误率。同使用滑动窗口实现计数。
2.4 LeapArry是如何实现滑动窗口计数的ArrayMetirc被StatisticSlot调用addPass方法
这里的data便是LeapArray的子类BucketLeapArray,或者OccupiableBucketLeapArray
2.4.1构造方法从构造方法我们可以看出LeapArray的构成
可以看到数据的存储使用了AtomicReferenceArray,其中sampleCount = 2,intervalInMs = 1000,也就说默认只有两个窗口,时间跨度为1s。
2.4.2 获取当前时间对应的桶这一段就是滑动窗口的精髓
public WindowWrap currentWindow(long timeMillis) { if (timeMillis < 0) { return null; } //根据当前时间,计算当前时间位于窗口数组中的下标 // 比如当前是 1600ms,窗口总大小为1000ms,一共两个格子,每一个格子大小为500ms //1600ms 落在(1600/500)% 2 = 1 int idx = calculateTimeIdx(timeMillis); // 计算当前时间 对应的起始 => 1600 - 1600%500 = 1500 // |0~500|500~1000|1000~1500|1500~2000| 1600位于1500~2000中所以起始为1500 long windowStart = calculateWindowStart(timeMillis); while (true) { //获取当前时间对应的桶 WindowWrap old = array.get(idx); //如果桶为null if (old == null) { //创建新元素 WindowWrap window = new WindowWrap(windowLengthInMs, windowStart, newEmptyBucket(timeMillis)); //cas 设置到数组中 if (array.compareAndSet(idx, null, window)) { return window; } else { //如果cas失败,说明在另外一个线程也是这个index,让当前线程yield放弃cpu Thread.yield(); } } else if (windowStart == old.windowStart()) { //旧桶的start和当前桶一样:意味着是同一秒 //比如600ms也是位于下标1,但是start 是500,就和当前1600ms的start 1500不同 意味着不是同一秒 return old; } else if (windowStart > old.windowStart()) { //比如600ms也是位于下标1,但是start 是500, // 就和当前1600ms的start 1500,1500>500,意味着当前这个桶已经过期了 //上锁 if (updateLock.tryLock()) { try { //设置windowStart 并且清空旧桶 return resetWindowTo(old, windowStart); } finally { updateLock.unlock(); } } else { //上锁失败,那么 Thread.yield(); } } else if (windowStart < old.windowStart()) { //当前start小于旧桶start //这种情况不应该发生,因为时间时越来越来大的 //除非当前线程 一值分配不到时间片? return new WindowWrap(windowLengthInMs, windowStart, newEmptyBucket(timeMillis)); } }}
这里有一个变量windowStart,就如同一个版本,标志了元素是否过期(旧元素windowStart小于当前windowStart说明旧元素过期),是否位于一个桶(windowStart相同说明位于同一个下标,比如1600,和1700,windowStart都是1500)
2.4.3 改变桶记录的值上面看了是如何拿到当前时间对应下标的桶的,那么桶中记录的数据是如何变更的?
可以看到这里使用了LongAdder进行计数,因为一个桶可能存在多个线程并发更新,使用LongAdder实现热点数据分离,也减少缓存伪共享带来的开销。(JUC源码学习笔记4——原子类源码分析,CAS,Volatile内存屏障,缓存伪共享与UnSafe相关方法)
2.4.4 如何获取当前qps上面我们直到了,通过的请求,根据时间对应了ArrayMetric滑动窗口数组中的一个元素,这个元素是MetricBucket类型,内部有一个LongAdder的数组,来记录pass,block(通过的请求,被拦截的请求)的数目,那么怎么根据这些数据获取当前qps昵?FlowSlot根据qps限流得使用这个数据呀!
我们看一下这里的pass方法
再看下values方法,入参就是当前时间,这个当前时间timeMills用来排除过期元素
拿通过次数自然是使用了LongAdder#sum方法,但是LongAdder#sum不具备瞬时一致性,也就说可能遍历到n位置,但是其他线程更改n-1位置,n-1的变化不会体现在sum的总和中。
这里有一段代码我觉得有点秒(个人理解,一点拙见不一定对)
就是这里统计pass会先执行data.currentWindow(),把当前时间对应的滑动窗口元素进行初始化。
比如执行data.currentWindow()当前时间点是1001ms,对应桶为null,那么这一步会初始化1001ms对应的桶元素。
后续values遍历的时间是1003ms,这时候就可以拿到1001ms初始化桶的引用返回,后续加入存在另外一个线程在1004ms进行更新,当前线程执行pass累加的时候就又更大概率统计到
为什么我说更大概率,因为如果不初始化的化,1004ms另外一个线程去初始化会new一个桶元素,这个过程是更耗时的,当前线程values遍历可能就没办法拿到1004ms这个线程初始化的桶了(有道理,但是不多doge)。
六丶总结学到了啥:
学习到了SPI和责任链,这两大解耦+扩展利器。学习到了自旋+CAS的操作。SPI由一方制定规范,比如SpringBoot的spring.factories,另一方进行扩展,实现服务使用方和提供方的解耦合,以及修改SPI文件内容进行扩展!责任链简直无处不在,在众多框架中,平时工作也常常使用到,使用责任链,责任单一,并且通过改变链条实现扩展!对熔断有了更深刻的理解,特别Open状态根据恢复时间判断+cas修改为半开,让一个线程试试水的操作,我大受震撼!了解到Sentinel还有除了针对单个资源进行限流,还有系统负载限流的功能,这个有点厉害,我一开始还想这个该咋实现,看了SystemSlot,发现自己还是太狭隘了。不足:
对Sentinel的Node理解不够,文档上说Sentine支持调用链路关系进行限流,这个功能挺牛的,但是我并没有详细阅读这部分源码。
对预热的实现没有深入理解,主要是Guava的预热模型,让二阳的我有点晕了,后续结合guava中的限流进行学习。