Sentinel源码解析——FlowSlot流控规则
在前面的文章,我们对Sentinel的原理进行了分析,Sentinel底层使用了责任链模式,这个责任链就是ProcessorSlotChain对象,链中的每个节点都是一个ProcessorSlot,每个ProcessorSlot对应一个规则的处理。
然后我们又对Sentinel的整体流程进行了源码分析,我们分析了Sentinel的ProcessorSlotChain对象默认的构成:
但是我们没有对每个slot进行深入的分析,本篇文章就对它们进行深入的了解。
StatisticNode与StatisticSlot
FlowSlot是根据StatisticSlot中的统计数据进行流控规则校验的,而StatisticSlot的统计数据又是维护在StatisticNode对象中,呈现以下关系:
StatisticNode内部结构
public class StatisticNode implements Node {
// 统计1秒内数据的滑动时间窗,1秒内有两个时间窗格,每个时间窗格的时间跨度是500ms
private transient volatile Metric rollingCounterInSecond = new ArrayMetric(SampleCountProperty.SAMPLE_COUNT,
IntervalProperty.INTERVAL);
// 统计1分钟内数据的滑动时间窗,1分钟内有60个时间窗格,每个时间窗格的时间跨度时1s
private transient Metric rollingCounterInMinute = new ArrayMetric(60, 60 * 1000, false);
/**
* 当前并发线程数计数器
*/
private LongAdder curThreadNum = new LongAdder();
...
}
StatisticNode有三个成员变量,其中rollingCounterInSecond和rollingCounterInMinute都是滑动时间窗计数器,用于分别统计1秒内和1分钟内的数据。
而curThreadNum则是一个LongAdder类型的并发线程数计数器。
rollingCounterInSecond和rollingCounterInMinute都是Metric类型,Metric是一个接口,实现类是ArrayMetric。
public class ArrayMetric implements Metric {
private final LeapArray<MetricBucket> data;
...
}
ArrayMetric里面是一个LeapArray<MetricBucket>。
public abstract class LeapArray<T> {
protected int windowLengthInMs;
protected int sampleCount;
protected int intervalInMs;
private double intervalInSecond;
protected final AtomicReferenceArray<WindowWrap<T>> array;
public LeapArray(int sampleCount, int intervalInMs) {
...
// 单个时间窗口长度 500ms
this.windowLengthInMs = intervalInMs / sampleCount;
// 以毫秒为单位的计数器统计的时间跨度 1000ms
this.intervalInMs = intervalInMs;
// 以秒为单位的计数器统计的时间跨度 1s
this.intervalInSecond = intervalInMs / 1000.0;
// 时间窗口个数 2个
this.sampleCount = sampleCount;
// 时间窗口数组
this.array = new AtomicReferenceArray<>(sampleCount);
}
}
LeapArray中的成员属性都是根据构造器参数sampleCount(时间窗口个数)和intervalInMs(以毫秒为单位的计数器统计的时间跨度)计算得出的。
其中sampleCount和intervalInMs直接作为LeapArray的成员属性保存。
windowLengthInMs单个时间窗口长度,自然是intervalInMs除以sampleCount获得。rollingCounterInSecond的intervalInMs是1000,sampleCount是2,因此windowLengthInMs为500,表示500ms一个时间窗口。
intervalInSecond是以秒为单位的计数器统计的时间跨度,因此是intervalInMs除以1000,这里的intervalInSecond就是1,代表当前计数器统计的时间范围是1秒。
array是一个AtomicReferenceArray,他就是时间窗口数组,每个窗口是一个WindowWrap对象,泛型是MetricBucket,sampleCount就是AtomicReferenceArray的数组长度,因此是两个时间窗口。
WindowWrap里面的这个MetricBucket就是真正记录统计数值的。
public class MetricBucket {
// 记录统计数组的数组,每个下标对应一个指标的统计值
private final LongAdder[] counters;
public MetricBucket() {
MetricEvent[] events = MetricEvent.values();
this.counters = new LongAdder[events.length];
for (MetricEvent event : events) {
counters[event.ordinal()] = new LongAdder();
}
}
...
}
MetricBucket里面还有结构,并不是一个单一的value,而是一个LongAdder[],数组中的每个LongAdder对应一个指标的统计,具体有哪些指标,可以查看MetricEvent中的枚举。
public enum MetricEvent {
// 规则校验通过数
PASS,
// 规则校验失败数
BLOCK,
// 异常数
EXCEPTION,
// 成功数
SUCCESS,
// 所有成功调用的响应时间
RT,
OCCUPIED_PASS
}
StatisticSlot
@Override
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
boolean prioritized, Object... args) throws Throwable {
try {
fireEntry(context, resourceWrapper, node, count, prioritized, args);
// 增加当前线程数
node.increaseThreadNum();
// 增加规则校验通过数
node.addPassRequest(count);
...
} catch (PriorityWaitException ex) {
...
} catch (BlockException e) {
...
// 增加流控规则校验失败计数
node.increaseBlockQps(count);
...
} catch (Throwable e) {
// 设置异常到context
context.getCurEntry().setError(e);
throw e;
}
}
StatisticSlot的entry方法与其他的slot处理流程不大一样,它是先调用fireEntry方法让slot链继续往后执行。然后后面才进行相关指标的统计。
@Override
public void exit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) {
Node node = context.getCurNode();
if (context.getCurEntry().getBlockError() == null) {
// 计算响应时间rt
long completeStatTime = TimeUtil.currentTimeMillis();
context.getCurEntry().setCompleteTimestamp(completeStatTime);
long rt = completeStatTime - context.getCurEntry().getCreateTimestamp();
// 从context中取出error,(如果抛异常,上面的entry方法会设置到context中)
Throwable error = context.getCurEntry().getError();
recordCompleteFor(node, count, rt, error);
// ...
}
// ...
fireExit(context, resourceWrapper, count, args);
}
private void recordCompleteFor(Node node, int batchCount, long rt, Throwable error) {
...
// 增加响应时间和成功数
node.addRtAndSuccess(rt, batchCount);
// 减去当前线程数
node.decreaseThreadNum();
if (error != null && !(error instanceof BlockException)) {
// 如果有异常,增加异常数
node.increaseExceptionQps(batchCount);
}
}
StatisticSlot的exit方法最后做的就是增加响应时间和成功数以及减去当前线程数;如果context中有异常(就是entry方法塞进去的)还会增加异常数。
我们发现StatisticSlot做的这些指标统计,全是调用node对象的方法,这个node对象就是StatisticNode。
node.increaseThreadNum()增加并发线程数:
StatisticNode#increaseThreadNum()
@Override
public void increaseThreadNum() {
curThreadNum.increment();
}
并发线程数是直接加到StatisticNode中的curThreadNum变量中。
而其他的指标都是加到滑动时间窗计数器里面,我们挑一个增加规则校验通过数的node.addPassRequest(count)来看。
StatisticNode#addPassRequest():
@Override
public void addPassRequest(int count) {
rollingCounterInSecond.addPass(count);
rollingCounterInMinute.addPass(count);
}
两个滑动时间窗计数器都增加。
ArrayMetric#addPass(int)
@Override
public void addPass(int count) {
// 根据当前时间戳定位对应的时间窗口
WindowWrap<MetricBucket> wrap = data.currentWindow();
// 时间窗口中的paas计数加1
wrap.value().addPass(count);
}
先是根据当前时间戳定位对应的时间窗口,然后把时间窗口中的pass计数加1。
看下是如何根据当前时间戳定位对应的时间窗口的:
LeapArray#currentWindow()
public WindowWrap<T> currentWindow() {
return currentWindow(TimeUtil.currentTimeMillis());
}
public WindowWrap<T> currentWindow(long timeMillis) {
// 根据当前时间戳计算时间窗数组下标
int idx = calculateTimeIdx(timeMillis);
// 计算窗口开始时间windowStart
long windowStart = calculateWindowStart(timeMillis);
while (true) {
// 根据下标取得时间窗
WindowWrap<T> old = array.get(idx);
if (old == null) {
// 定位到的时间窗口为空,创建,窗口开始时间就是windowStart
WindowWrap<T> window = new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis));
...
} else if (windowStart == old.windowStart()) {
// 时间窗口的开始时间等于windowStart,表示这个时间窗没过期,返回该时间窗
return old;
} else if (windowStart > old.windowStart()) {
// 时间窗口的开始时间小于windowStart,表示这个时间窗已过期,重置里面的计数,然后再返回这个时间窗口
return resetWindowTo(old, windowStart);
} ...
}
}
首先根据当前时间戳计算时间窗数组下标idx,通过下标就可以取得对应的时间窗old = array.get(idx)。
除此以外,还会根据当前时间戳计算一个窗口开始时间windowStart,然后每个时间窗创建的时候都会记录一个开始时间old.windowStart(),两个开始时间一比较,就可得知当前时间窗是否已过期。如果old.windowStart()小于windowStart,那么表示时间窗口old已经过期了。
根据当前时间戳计算目标窗口下标:
LeapArray#calculateTimeIdx(long)
private int calculateTimeIdx(long timeMillis) {
// 当前时间戳除以当个窗口的时间跨度(500ms),得到timeId
long timeId = timeMillis / windowLengthInMs;
// timeId对时间窗数组长度取模,得到下标idx
return (int)(timeId % array.length());
}
根据当前时间戳计算窗口开始时间windowStart:
LeapArray#calculateWindowStart(long)
protected long calculateWindowStart(/*@Valid*/ long timeMillis) {
// 当前时间戳 - 当前时间戳 % 当个时间窗的时间跨度
return timeMillis - timeMillis % windowLengthInMs;
}
得到时间窗后,就是执行“wrap.value().addPass(count);”这行代码,首先时间窗WindowWrap的泛型是MetricBucket,因此wrap.value()取到的是MetricBucket对象,然后调用MetricBucket的addPass(count)方法。
MetricBucket#addPass(int)
public void addPass(int n) {
add(MetricEvent.PASS, n);
}
public MetricBucket add(MetricEvent event, long n) {
// counters是个LongAdder[]
// PASS对应枚举值0
// 因此这里就是对MetricBucket中的LongAdder数组counters中下标为0的LongAdder增加n
counters[event.ordinal()].add(n);
return this;
}
我们上面已经说过MetricBucket中是一个LongAdder[]记录不同指标的计数。而Event.PASS对应的枚举值是0,因此这里就是对LongAdder数组counters中下标为0的LongAdder增加n,正好对应的就是paas指标的LongAdder。
FlowSlot流控规则
FlowSlot的流控规则校验逻辑全在entry方法中,而exit直接调用fireExit方法往下走,因此我们只看FlowSlot的entry方法即可。
FlowSlot#entry(…)
@Override
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
boolean prioritized, Object... args) throws Throwable {
// 流控规则校验
checkFlow(resourceWrapper, context, node, count, prioritized);
fireEntry(...);
}
FlowSlot#checkFlow(…)
void checkFlow(ResourceWrapper resource, Context context, DefaultNode node, int count, boolean prioritized)
throws BlockException {
// 流控规则校验
checker.checkFlow(ruleProvider, resource, context, node, count, prioritized);
}
FlowRuleChecker#checkFlow(…)
public void checkFlow(Function<String, Collection<FlowRule>> ruleProvider, ResourceWrapper resource,
Context context, DefaultNode node, int count, boolean prioritized) throws BlockException {
...
// 从ruleProvider中根据资源名称取得对应的流控规则集合
Collection<FlowRule> rules = ruleProvider.apply(resource.getName());
if (rules != null) {
for (FlowRule rule : rules) {
// 逐个校验每个流控规则,一旦有一个校验不通过,则抛出FlowException
if (!canPassCheck(rule, context, node, count, prioritized)) {
throw new FlowException(rule.getLimitApp(), rule);
}
}
}
}
沿着FlowSlot的entry方法一路进来,到FlowRuleChecker的checkFlow方法。主体流程就是先根据资源名称取到对应的流控规则集合,然后再遍历这个流控规则集合,逐一校验每个流程规则,如果有哪一个规则校验没有通过,那么就抛出FlowException。
FlowRuleChecker#canPassCheck(FlowRule, …)
public boolean canPassCheck(/*@NonNull*/ FlowRule rule, Context context, DefaultNode node, int acquireCount, boolean prioritized) {
...
// 单个流控规则校验
return passLocalCheck(rule, context, node, acquireCount, prioritized);
}
private static boolean passLocalCheck(FlowRule rule, Context context, DefaultNode node, int acquireCount, boolean prioritized) {
...
// 单个流控规则校验
return rule.getRater().canPass(selectedNode, acquireCount, prioritized);
}
“return rule.getRater().canPass(selectedNode, acquireCount, prioritized);”这一行代码有可能进入不同的实现类,视我们选择的“流控效果”而定。
- DefaultController#canPass(Node, int, boolean):快速失败(滑动时间窗算法)
- WarmUpController#canPass(Node, int, boolean):Warm Up(令牌桶算法)
- RateLimiterController#canPass(Node, int, boolean):排队等待(漏桶算法)
DefaultController#canPass(Node, int, boolean):
@Override
public boolean canPass(Node node, int acquireCount, boolean prioritized) {
// 当前的QPS(或者并发线程数,看我们选的“阈值类型”是什么)
int curCount = avgUsedTokens(node);
// 加上要增加的数目,如果超了,返回false表示校验失败,没超则返回true表示校验通过
if (curCount + acquireCount > count) {
...
return false;
}
return true;
}
DefaultController#canPass(Node, int, boolean)方法是流控效果为“快速失败”对应的流控规则校验类,使用的时滑动时间窗算法。首先计算获取当前的QPS或者并发线程数,这个视乎我们选的“阈值类型”而定,然后加上当前要申请的数目acquireCount,如果超过了阈值,返回false表示校验失败,没超则返回true表示校验通过。
DefaultController#avgUsedTokens(Node):
private int avgUsedTokens(Node node) {
...
// node.curThreadNum()取得的时并发线程数
// node.passQps()取得的是QPS
return grade == RuleConstant.FLOW_GRADE_THREAD ? node.curThreadNum() : (int)(node.passQps());
}
node.curThreadNum()其实就是取的StatisticNode中curThreadNum属性的值,它是一个LongAdder类型,上面已经介绍过。
、StatisticNode#curThreadNum()
@Override
public int curThreadNum() {
return (int)curThreadNum.sum();
}
node.passQps()则要进行计算。
StatisticNode#passQps()
@Override
public double passQps() {
return rollingCounterInSecond.pass() / rollingCounterInSecond.getWindowIntervalInSec();
}
rollingCounterInSecond.pass()取得的是所有的时间窗口pass计数的汇总:
ArrayMetric#pass()
@Override
public long pass() {
// 这里是判断如果当前的时间窗口过期,则重置它
data.currentWindow();
// 所有的时间窗口的paas计数加总到pass
long pass = 0;
List<MetricBucket> list = data.values();
for (MetricBucket window : list) {
pass += window.pass();
}
return pass;
}
window.pass()则是取得单个MetricBucket中的pass计数。
MetricBucket#pass():
public long pass() {
return get(MetricEvent.PASS);
}
public long get(MetricEvent event) {
return counters[event.ordinal()].sum();
}
这样就取得了计数器中每个时间窗口的pass计数加总后的值,但这个并不是QPS,因为有可能这个计数器的时间跨度是大于1秒的,因此还要除以rollingCounterInSecond.getWindowIntervalInSec()。
rollingCounterInSecond.getWindowIntervalInSec()取得的是整个计数器的时间跨度(以秒为单位):
ArrayMetric#getWindowIntervalInSec()
@Override
public double getWindowIntervalInSec() {
return data.getIntervalInSecond();
}
data是LeapArray类型,进入LeapArray的getIntervalInSecond方法:
LeapArray#getIntervalInSecond()
public double getIntervalInSecond() {
return intervalInSecond;
}
返回的是LeapArray中的intervalInSecond,也就是计数器统计的时间跨度(以秒为单位),那么“rollingCounterInSecond.pass() / rollingCounterInSecond.getWindowIntervalInSec()”得到的就是平均每秒的通过数(pass),也就是QPS。
以下是FlowSlot流控规则校验的整体流程,结合StatisticNode内部结构的那张大图看,思路就非常清晰了。