Spring Cloud Alibaba Sentinel 源码阅读之流量控制算法
流量统计数据结构
几个重要的类
每一个资源的访问统计都被封装成一个StatisticNode用来记录各项指标
class StatisticNode{ Metric rollingCounterInSecond = new ArrayMetric(2,1000);}
每个StatisticNode中有一个ArrayMetric负责具体的数据统计
class ArrayMetric{ LeapArray<MetricBucket> data = new OccupiableBucketLeapArray(2, 1000);}
LeapArray
负责维护一个循环数组,每个元素代表一个时间窗口的数据
class LeapArray{ protected int windowLengthInMs;//单个窗口的长度(毫秒) protected int sampleCount;// 窗口数量 protected int intervalInMs;//总的统计时长 = windowLengthInMs * sampleCount private double intervalInSecond; protected final AtomicReferenceArray<WindowWrap<T>> array; public LeapArray(int sampleCount, int intervalInMs) { this.windowLengthInMs = intervalInMs / sampleCount; this.intervalInMs = intervalInMs; this.intervalInSecond = intervalInMs / 1000.0; this.sampleCount = sampleCount; this.array = new AtomicReferenceArray<>(sampleCount); }}
WindowWrap: 是 LeapArray
数组中的一个元素,它包装了一个 MetricBucket
以及该窗口的开始时间戳。
public class WindowWrap<T> { private final long windowStart; // 窗口的开始时间 private final int windowLength; // 窗口的长度 private volatile T value; // 实际存储的 MetricBucket}
MetricBucket: 存储在一个时间窗口内(例如 200ms)的各项指标,如通过请求数、拒绝请求数、异常数等。它内部使用 LongAdder
来保证并发安全地更新计数。
StatisticSlot流量统计
看完了数据结构,继续来看计数流程。开始位置还要冲slot的entry方法
StatisticSlot#entry()
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,boolean prioritized, Object... args) throws Throwable { try { //调用slot链 fireEntry(context, resourceWrapper, node, count, prioritized, args); // 请求通过,增加线程数和请求通过数 node.increaseThreadNum(); node.addPassRequest(count);... } catch (PriorityWaitException ex) { node.increaseThreadNum(); ... } catch (BlockException e) { // Blocked, set block exception to current entry. context.getCurEntry().setBlockError(e); // 添加阻塞数 node.increaseBlockQps(count); ... throw e; } catch (Throwable e) { ... } }
这里不看其他的,就看node.addPassRequest(count);这一步请求通过计数。
这里node继承自StatisticNode会掉到其addPassRequest方法
StatisticNode#addPassRequest()
public void addPassRequest(int count) { rollingCounterInSecond.addPass(count); rollingCounterInMinute.addPass(count); }
根据上面的数据结构,rollingCounterInSecond是ArrayMetric实例,
ArrayMetric#addPass
public void addPass(int count) { WindowWrap<MetricBucket> wrap = data.currentWindow(); wrap.value().addPass(count);}
这里data实例是LeapArray类型,currentWindow()方法获取当前实际窗口数据对象
LeapArray#currentWindow()
public WindowWrap<T> currentWindow() {return currentWindow(TimeUtil.currentTimeMillis());}public WindowWrap<T> currentWindow(long timeMillis) { if (timeMillis < 0) { return null; }//根据时间戳获取当前时间对应的窗口下标 int idx = calculateTimeIdx(timeMillis); // 根据时间戳获取当前时间对应的窗口开始时间 long windowStart = calculateWindowStart(timeMillis); /* 从数组中获取指定时间点的桶(bucket)项。 1、如果桶不存在,则创建一个新的桶,并使用 CAS(比较并交换)更新到环形数组中。 2、如果桶匹配当前开始时间,直接返回该桶。 3、如果桶已过期,则重置当前桶,并清理所有过期的桶。 */ while (true) { WindowWrap<T> old = array.get(idx); if (old == null) { WindowWrap<T> window = new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis)); if (array.compareAndSet(idx, null, window)) { // Successfully updated, return the created bucket. return window; } else { // Contention failed, the thread will yield its time slice to wait for bucket available. Thread.yield(); } } else if (windowStart == old.windowStart()) { return old; } else if (windowStart > old.windowStart()) { if (updateLock.tryLock()) { try { // Successfully get the update lock, now we reset the bucket. return resetWindowTo(old, windowStart); } finally { updateLock.unlock(); } } else { Thread.yield(); } } else if (windowStart < old.windowStart()) { return new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis)); } } }
这里有两个重要的方法calculateTimeIdx和calculateWindowStart
private int calculateTimeIdx(/*@Valid*/ long timeMillis) { //当前时间除以一个时间窗口的长度 long timeId = timeMillis / windowLengthInMs; // Calculate current index so we can map the timestamp to the leap array. return (int)(timeId % array.length()); }
这样时间每增加一个时间窗口长度,数值下标会往前推进1。
计数窗口开始时间,这个也好理解,其实就是求上个时间窗口的结束时间,
protected long calculateWindowStart(/*@Valid*/ long timeMillis) { return timeMillis - timeMillis % windowLengthInMs;}
拿到时间窗口buck后WindowWrap
调用MetricBucket.addPass(count)增加计数。
实际情况分析:
sentinel默认的滑动窗口周期是1000毫秒,样本数是2。这样一个窗口的时间长度是 1000/2 = 500毫秒,LeapArray数组长度为2,存储两个样本窗口数据。
所属窗口编号 = (当前时间/500)%2
窗口开始时间= (当前时间 - 当前时间%500)
不同时间请求对应窗口信息:
这里看到LeapArray数组中两个元素会被循环使用,过去的窗口数据会被清空覆盖掉。
FlowSlot流量控制
数据统计好了,下一步就是根据流量数据统计进行控制
FlowSlot#entry()
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,boolean prioritized, Object... args) throws Throwable { // 校验限流规则 checkFlow(resourceWrapper, context, node, count, prioritized); //调用下一个slot fireEntry(context, resourceWrapper, node, count, prioritized, args); }
限流规则校验主要在FlowRuleChecker中完成。checkFlow()校验首先根据当前资源从FlowRuleManager获取适配当前资源的限流规则,然后逐一进行规则校验。
FlowRuleChecker#checkFlow
public void checkFlow(Function<String, Collection<FlowRule>> ruleProvider, ResourceWrapper resource, Context context, DefaultNode node, int count, boolean prioritized) throws BlockException { if (ruleProvider == null || resource == null) { return; } //当前资源上配置的限流规则 Collection<FlowRule> rules = ruleProvider.apply(resource.getName()); if (rules != null) { //逐一校验 for (FlowRule rule : rules) { if (!canPassCheck(rule, context, node, count, prioritized)) { throw new FlowException(rule.getLimitApp(), rule); } } } }
每一个FlowRule有一个对应的TrafficShapingController来执行判断。具体判断在canPass()方法中实现。
默认的策略是直接拒绝,DefaultController。其他的还有RateLimiterController
, WarmUpController
。
DefaultController#canPass()
public boolean canPass(Node node, int acquireCount, boolean prioritized) { //获取当前节点的请求数 int curCount = avgUsedTokens(node); //count是阈值,如果大于阈值直接返回false if (curCount + acquireCount > count) { if (prioritized && grade == RuleConstant.FLOW_GRADE_QPS) { .... } return false; } return true; }private int avgUsedTokens(Node node) { if (node == null) { return DEFAULT_AVG_USED_TOKENS; } //计数所有有效的pass数量 return grade == RuleConstant.FLOW_GRADE_THREAD ? node.curThreadNum() : (int)(node.passQps()); }
流程算法总结
当一个请求进入 Sentinel 的流量统计 StatisticSlot
时:
- 获取当前时间。
- 通过
LeapArray.currentWindow()
方法获取当前时间对应的WindowWrap
。 - 如果当前窗口是新窗口或过期窗口,会被重置。
- 在获取到的
MetricBucket
上,调用pass.increment()
等方法,增加对应的指标计数。 - 在进入到FlowSlot进行限流判断时,会通过
LeapArray.values(currentTime)
获取所有有效(未过期)的MetricBucket
,然后遍历这些MetricBucket
,累加它们的pass
计数,从而得到在滑动窗口内的总 QPS。