> 技术文档 > Spring Cloud Alibaba Sentinel 源码阅读之流量控制算法

Spring Cloud Alibaba Sentinel 源码阅读之流量控制算法


流量统计数据结构

几个重要的类

每一个资源的访问统计都被封装成一个StatisticNode用来记录各项指标

class StatisticNode{ Metric rollingCounterInSecond = new ArrayMetric(2,1000);}

每个StatisticNode中有一个ArrayMetric负责具体的数据统计

class ArrayMetric{ LeapArray<MetricBucket> data = new OccupiableBucketLeapArray(2, 1000);}

LeapArray 负责维护一个循环数组,每个元素代表一个时间窗口的数据

class LeapArray{ protected int windowLengthInMs;//单个窗口的长度(毫秒) protected int sampleCount;// 窗口数量 protected int intervalInMs;//总的统计时长 = windowLengthInMs * sampleCount private double intervalInSecond; protected final AtomicReferenceArray<WindowWrap<T>> array; public LeapArray(int sampleCount, int intervalInMs) { this.windowLengthInMs = intervalInMs / sampleCount; this.intervalInMs = intervalInMs; this.intervalInSecond = intervalInMs / 1000.0; this.sampleCount = sampleCount; this.array = new AtomicReferenceArray<>(sampleCount); }}

WindowWrap: 是 LeapArray 数组中的一个元素,它包装了一个 MetricBucket 以及该窗口的开始时间戳。

public class WindowWrap<T> { private final long windowStart; // 窗口的开始时间 private final int windowLength; // 窗口的长度 private volatile T value; // 实际存储的 MetricBucket}

MetricBucket: 存储在一个时间窗口内(例如 200ms)的各项指标,如通过请求数、拒绝请求数、异常数等。它内部使用 LongAdder 来保证并发安全地更新计数。

StatisticSlot流量统计

看完了数据结构,继续来看计数流程。开始位置还要冲slot的entry方法

StatisticSlot#entry()

 public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,boolean prioritized, Object... args) throws Throwable { try { //调用slot链 fireEntry(context, resourceWrapper, node, count, prioritized, args); // 请求通过,增加线程数和请求通过数 node.increaseThreadNum(); node.addPassRequest(count);... } catch (PriorityWaitException ex) { node.increaseThreadNum(); ... } catch (BlockException e) { // Blocked, set block exception to current entry. context.getCurEntry().setBlockError(e); // 添加阻塞数 node.increaseBlockQps(count); ... throw e; } catch (Throwable e) { ... } }

这里不看其他的,就看node.addPassRequest(count);这一步请求通过计数。

这里node继承自StatisticNode会掉到其addPassRequest方法

StatisticNode#addPassRequest()

public void addPassRequest(int count) { rollingCounterInSecond.addPass(count); rollingCounterInMinute.addPass(count); }

根据上面的数据结构,rollingCounterInSecond是ArrayMetric实例,

ArrayMetric#addPass

public void addPass(int count) { WindowWrap<MetricBucket> wrap = data.currentWindow(); wrap.value().addPass(count);}

这里data实例是LeapArray类型,currentWindow()方法获取当前实际窗口数据对象

LeapArray#currentWindow()

public WindowWrap<T> currentWindow() {return currentWindow(TimeUtil.currentTimeMillis());}public WindowWrap<T> currentWindow(long timeMillis) { if (timeMillis < 0) { return null; }//根据时间戳获取当前时间对应的窗口下标 int idx = calculateTimeIdx(timeMillis); // 根据时间戳获取当前时间对应的窗口开始时间 long windowStart = calculateWindowStart(timeMillis); /* 从数组中获取指定时间点的桶(bucket)项。 1、如果桶不存在,则创建一个新的桶,并使用 CAS(比较并交换)更新到环形数组中。 2、如果桶匹配当前开始时间,直接返回该桶。 3、如果桶已过期,则重置当前桶,并清理所有过期的桶。 */ while (true) { WindowWrap<T> old = array.get(idx); if (old == null) { WindowWrap<T> window = new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis)); if (array.compareAndSet(idx, null, window)) {  // Successfully updated, return the created bucket.  return window; } else {  // Contention failed, the thread will yield its time slice to wait for bucket available.  Thread.yield(); } } else if (windowStart == old.windowStart()) { return old; } else if (windowStart > old.windowStart()) { if (updateLock.tryLock()) {  try { // Successfully get the update lock, now we reset the bucket. return resetWindowTo(old, windowStart);  } finally { updateLock.unlock();  } } else {  Thread.yield(); } } else if (windowStart < old.windowStart()) { return new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis)); } } }

这里有两个重要的方法calculateTimeIdx和calculateWindowStart

 private int calculateTimeIdx(/*@Valid*/ long timeMillis) { //当前时间除以一个时间窗口的长度 long timeId = timeMillis / windowLengthInMs; // Calculate current index so we can map the timestamp to the leap array. return (int)(timeId % array.length()); }

这样时间每增加一个时间窗口长度,数值下标会往前推进1。

计数窗口开始时间,这个也好理解,其实就是求上个时间窗口的结束时间,

protected long calculateWindowStart(/*@Valid*/ long timeMillis) { return timeMillis - timeMillis % windowLengthInMs;}

拿到时间窗口buck后WindowWrap 调用MetricBucket.addPass(count)增加计数。

实际情况分析:

sentinel默认的滑动窗口周期是1000毫秒,样本数是2。这样一个窗口的时间长度是 1000/2 = 500毫秒,LeapArray数组长度为2,存储两个样本窗口数据。

所属窗口编号 = (当前时间/500)%2

窗口开始时间= (当前时间 - 当前时间%500)

不同时间请求对应窗口信息:

请求编号 请求时间 所属窗口编号 窗口开始时间 1 0 0 0 2 200 0 0 3 300 0 0 4 600 1 500 5 800 1 500 6 1100 0 1000 7 1600 1 1500 …

这里看到LeapArray数组中两个元素会被循环使用,过去的窗口数据会被清空覆盖掉。

FlowSlot流量控制

数据统计好了,下一步就是根据流量数据统计进行控制

FlowSlot#entry()

 public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,boolean prioritized, Object... args) throws Throwable { // 校验限流规则 checkFlow(resourceWrapper, context, node, count, prioritized); //调用下一个slot fireEntry(context, resourceWrapper, node, count, prioritized, args); }

限流规则校验主要在FlowRuleChecker中完成。checkFlow()校验首先根据当前资源从FlowRuleManager获取适配当前资源的限流规则,然后逐一进行规则校验。

FlowRuleChecker#checkFlow

 public void checkFlow(Function<String, Collection<FlowRule>> ruleProvider, ResourceWrapper resource, Context context, DefaultNode node, int count, boolean prioritized) throws BlockException { if (ruleProvider == null || resource == null) { return; } //当前资源上配置的限流规则 Collection<FlowRule> rules = ruleProvider.apply(resource.getName()); if (rules != null) { //逐一校验 for (FlowRule rule : rules) { if (!canPassCheck(rule, context, node, count, prioritized)) {  throw new FlowException(rule.getLimitApp(), rule); } } } }

每一个FlowRule有一个对应的TrafficShapingController来执行判断。具体判断在canPass()方法中实现。

默认的策略是直接拒绝,DefaultController。其他的还有RateLimiterController, WarmUpController

DefaultController#canPass()

 public boolean canPass(Node node, int acquireCount, boolean prioritized) { //获取当前节点的请求数 int curCount = avgUsedTokens(node); //count是阈值,如果大于阈值直接返回false if (curCount + acquireCount > count) { if (prioritized && grade == RuleConstant.FLOW_GRADE_QPS) { .... } return false; } return true; }private int avgUsedTokens(Node node) { if (node == null) { return DEFAULT_AVG_USED_TOKENS; } //计数所有有效的pass数量 return grade == RuleConstant.FLOW_GRADE_THREAD ? node.curThreadNum() : (int)(node.passQps()); }

流程算法总结

当一个请求进入 Sentinel 的流量统计 StatisticSlot 时:

  1. 获取当前时间。
  2. 通过 LeapArray.currentWindow() 方法获取当前时间对应的 WindowWrap
  3. 如果当前窗口是新窗口或过期窗口,会被重置。
  4. 在获取到的 MetricBucket 上,调用 pass.increment() 等方法,增加对应的指标计数。
  5. 在进入到FlowSlot进行限流判断时,会通过 LeapArray.values(currentTime) 获取所有有效(未过期)的 MetricBucket,然后遍历这些 MetricBucket,累加它们的 pass 计数,从而得到在滑动窗口内的总 QPS。