10 限流降级与流量效果控制器(上)

从这篇开始,我们学习 Sentinel 提供的几个实现降级功能的 ProcessorSlot,这些 ProcessorSlot 检查实时指标数据是否达到规则所配置的阈值,当达到阈值时,或抛出 Block 异常或采取流量效果控制策略处理超阈值的流量。

Sentinel 实现限流降级、熔断降级、黑白名单限流降级、系统自适应限流降级以及热点参数限流降级都是由 ProcessorSlot、Checker、Rule、RuleManager 组合完成。ProcessorSlot 作为调用链路的切入点,负责调用 Checker 检查当前请求是否可以放行;Checker 则根据资源名称从 RuleManager 中拿到为该资源配置的 Rule(规则),取 ClusterNode 统计的实时指标数据与规则对比,如果达到规则的阈值则抛出 Block 异常,抛出 Block 异常意味着请求被拒绝,也就实现了限流或熔断。

可以总结为以下三个步骤:

  1. 在 ProcessorSlot#entry 方法中调用 Checker#check 方法,并将 DefaultNode 传递给 Checker。
  2. Checker 从 DefaultNode 拿到 ClusterNode,并根据资源名称从 RuleManager 获取为该资源配置的规则。
  3. Checker 从 ClusterNode 中获取当前时间窗口的某项指标数据(QPS、avgRt 等)与规则的阈值对比,如果达到规则的阈值则抛出 Block 异常(也有可能将 check 交给 Rule 去实现)。

限流规则与规则配置加载器

Sentinel 在最初的框架设计上,将是否允许请求通过的判断行为交给 Rule 去实现,所以将 Rule 定义成了接口。Rule 接口只定义了一个 passCheck 方法,即判断当前请求是否允许通过。Rule 接口的定义如下:

public interface Rule {
    boolean passCheck(Context context, DefaultNode node, int count, Object... args);
}

  • context:当前调用链路上下文。
  • node:当前资源的 DefaultNode。
  • count:一般为 1,用在令牌桶算法中表示需要申请的令牌数,用在 QPS 统计中表示一个请求。
  • args:方法调用参数(被 Sentinel 拦截的目标方法),用于实现热点参数限流降级的。

因为规则是围绕资源配置的,一个规则只对某个资源起作用,因此 Sentinel 提供了一个抽象规则配置类 AbstractRule,AbstractRule 的定义如下:

public abstract class AbstractRule implements Rule {
    private String resource;
    private String limitApp;
    // ....
}

  • resource:资源名称,规则的作用对象。
  • limitApp:只对哪个或者哪些调用来源生效,若为 default 则不区分调用来源。

Rule、AbstractRule 与其它实现类的关系如下图所示:

10-01-rule

FlowRule 是限流规则配置类,FlowRule 继承 AbstractRule 并实现 Rule 接口。FlowRule 源码如下,非完整源码,与实现集群限流相关的字段暂时去掉了。

public class FlowRule extends AbstractRule {
    // 限流阈值类型 qps|threads
    private int grade = RuleConstant.FLOW_GRADE_QPS;
    // 限流阈值
    private double count;
    // 基于调用关系的限流策略
    private int strategy = RuleConstant.STRATEGY_DIRECT;
    // 配置 strategy 使用,入口资源名称
    private String refResource;
    // 流量控制效果(直接拒绝、Warm Up、匀速排队)
    private int controlBehavior = RuleConstant.CONTROL_BEHAVIOR_DEFAULT;
    // 冷启动时长(预热时长),单位秒
    private int warmUpPeriodSec = 10;
    // 最大排队时间。
    private int maxQueueingTimeMs = 500;
    // 流量控制器
    private TrafficShapingController controller;
    //.....
    @Override
    public boolean passCheck(Context context, DefaultNode node, int acquireCount, Object... args) {
        return true;
    }
}

  • FlowRule 的字段解析已在源码中给出注释,现在还不需要急于去理解每个字段的作用。
  • FlowRule 实现 Rule 接口方法只是返回 true,因为 passCheck 的逻辑并不由 FlowRule 实现。

Rule 定义的行为应该只是 Sentinel 在最初搭建框架时定义的约定,Sentinel 自己也并没有都遵守这个约定,很多规则并没有将 passCheck 交给 Rule 去实现,Checker 可能是后续引入的,用于替代 Rule 的 passCheck 行为。

Sentinel 中用来管理规则配置的类都以规则类的名称+Manger 命名,除此之外,并没有对规则管理器有什么行为上的约束。

用来加载限流规则配置以及缓存限流规则配置的类为 FlowRuleManager,其部分源码如下:

public class FlowRuleManager {
    // 缓存规则
    private static final Map<String, List<FlowRule>> flowRules = new ConcurrentHashMap<String, List<FlowRule>>();
    // 获取所有规则
    static Map<String, List<FlowRule>> getFlowRuleMap() {
        return flowRules;
    }
    // 更新规则
    public static void loadRules(List<FlowRule> rules) {
        // 更新静态字段 flowRules
    }
}

  • flowRules 静态字段:用于缓存规则配置,使用 ConcurrentMap 缓存,key 为资源的名称,value 是一个 FlowRule 数组。使用数组是因为 Sentinel 支持针对同一个资源配置多种限流规则,只要其中一个先达到限流的阈值就会触发限流。
  • loadRules:提供给使用者加载和更新规则的 API,该方法会将参数传递进来的规则数组转为 Map,然后先清空 flowRules 当前缓存的规则配置,再将新的规则配置写入 flowRules。
  • getFlowRuleMap:提供给 FlowSlot 获取配置的私有 API。

限流处理器插槽:FlowSlot

FlowSlot 是实现限流功能的切入点,它作为 ProcessorSlot 插入到 ProcessorSlotChain 链表中,在 entry 方法中调用 Checker 去判断是否需要拒绝当前请求,如果需要拒绝请求则抛出 Block 异常。FlowSlot 的源码如下:

public class FlowSlot extends AbstractLinkedProcessorSlot<DefaultNode> {
    private final FlowRuleChecker checker;
    public FlowSlot() {
        this(new FlowRuleChecker());
    }

   // 规则生产者,一个 Function
    private final Function<String, Collection<FlowRule>> ruleProvider = new Function<String, Collection<FlowRule>>() {
        // 参数为资源名称
        @Override
        public Collection<FlowRule> apply(String resource) {
            Map<String, List<FlowRule>> flowRules = FlowRuleManager.getFlowRuleMap();
            return flowRules.get(resource);
        }
    };

    @Override
    public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
                      boolean prioritized, Object... args) throws Throwable {
        checkFlow(resourceWrapper, context, node, count, prioritized);
        fireEntry(context, resourceWrapper, node, count, prioritized, args);
    }
  // check 是否限流
    void checkFlow(ResourceWrapper resource, Context context, DefaultNode node, int count, boolean prioritized)
        throws BlockException {
        checker.checkFlow(ruleProvider, resource, context, node, count, prioritized);
    }

    @Override
    public void exit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) {
        fireExit(context, resourceWrapper, count, args);
    }
}

FlowSlot 在构造方法中创建 FlowRuleChecker,并在 entry 方法中调用 FlowRuleChecker#checkFlow 方法判断是否需要拦截当前请求。在调用 FlowRuleChecker#checkFlow 方法时传入了一个 Function 接口实例,FlowRuleChecker 可调用该 Function 的 apply 方法从 FlowRuleManager 获取资源的所有规则配置,当然,最终还是调用 FlowRuleManager#getFlowRuleMap 方法从 FlowRuleManager 获取。

限流规则检查器:FlowRuleChecker

FlowRuleChecker 与 FlowRuleManager 一样,Sentinel 也并没有约定 Checker 必须具有哪些行为,只是在命名上约定 Checker 类需以规则类的名称 + “Checker”命名。FlowRuleChecker 负责判断是否需要拒绝当前请求,由于 FlowRuleChecker 类的源码很多,所以我们按过程分析用到的每个方法。

首先是由 FlowSlot 调用的 checkFlow 方法,该方法源码如下:

public void checkFlow(Function<String, Collection<FlowRule>> ruleProvider, ResourceWrapper resource,
                          Context context, DefaultNode node, int count, boolean prioritized) throws BlockException {
        if (ruleProvider == null || resource == null) {
            return;
        }
        // (1)
        Collection<FlowRule> rules = ruleProvider.apply(resource.getName());
        if (rules != null) {
            // (2)
            for (FlowRule rule : rules) {
                // (3)
                if (!canPassCheck(rule, context, node, count, prioritized)) {
                    throw new FlowException(rule.getLimitApp(), rule);
                }
            }
        }
}

checkFlow 方法我们分三步分析:

  • 调用 FlowSlot 传递过来的 ruleProvider 的 apply 方法获取当前资源的所有限流规则;
  • 遍历限流规则,只要有一个限流规则达到限流阈值即可抛出 FlowException,使用 FlowException 目的是标志当前请求因为达到限流阈值被拒绝,FlowException 是 BlockException 的子类;
  • 调用 canPassCheck 方法判断当前请求是否允许通过。

canPassCheck 即“can pass check”,意思是检查是否允许通过,后面我们也统一将“检查是否允许当前请求通过”使用 canPassCheck 代指,canPassCheck 方法返回 true 说明允许请求通过,反之则不允许通过。canPassCheck 方法源码如下:

public boolean canPassCheck(FlowRule rule, Context context, DefaultNode node, int acquireCount,boolean prioritized) {
        // (1)
        String limitApp = rule.getLimitApp();
        if (limitApp == null) {
            return true;
        }
        // (2)
        if (rule.isClusterMode()) {
            return passClusterCheck(rule, context, node, acquireCount, prioritized);
        }
        // (3)
        return passLocalCheck(rule, context, node, acquireCount, prioritized);
}

  • 当前限流规则只对哪个调用来源生效,如果为 null 则返回 true,一般不为 null,默认为“default”(不限定调用来源);
  • 是否是集群限流模式,如果是集群限流模式则调用 passClusterCheck 方法完成 canPassCheck,我们暂时先不讨论集群限流的情况;
  • 非集群限流模式则调用 passLocalCheck 方法完成 canPassCheck。

passLocalCheck 方法源码如下:

private static boolean passLocalCheck(FlowRule rule, Context context, DefaultNode node, int acquireCount,
                                          boolean prioritized) {
        // (1)
        Node selectedNode = selectNodeByRequesterAndStrategy(rule, context, node);
        if (selectedNode == null) {
            return true;
        }
        // (2)
        return rule.getRater()
          // (3)
          .canPass(selectedNode, acquireCount, prioritized);
    }

  • 根据调用来源和“调用关系限流策略”选择 DefaultNode;
  • 获取限流规则配置的流量效果控制器(TrafficShapingController);
  • 调用 TrafficShapingController#canPass 方法完成 canPassCheck。

selectNodeByRequesterAndStrategy 方法的实现逻辑很复杂,实现根据限流规则配置的 limitApp 与 strategy 选择一个 StatisticNode,两个字段的组合情况可以有 6 种。selectNodeByRequesterAndStrategy 方法源码如下:

static Node selectNodeByRequesterAndStrategy(FlowRule rule, Context context, DefaultNode node) {
        // 限流规则针对哪个来源生效
        String limitApp = rule.getLimitApp();
        // 基于调用关系的限流策略
        int strategy = rule.getStrategy();
        // 远程来源
        String origin = context.getOrigin();
        if (limitApp.equals(origin) && filterOrigin(origin)) {
            if (strategy == RuleConstant.STRATEGY_DIRECT) {
                //(1)
                return context.getOriginNode();
            }
            //(2)
            return selectReferenceNode(rule, context, node);
        }
        else if (RuleConstant.LIMIT_APP_DEFAULT.equals(limitApp)) {
            if (strategy == RuleConstant.STRATEGY_DIRECT) {
                //(3)
                return node.getClusterNode();
            }
            //(4)
            return selectReferenceNode(rule, context, node);
        }
        else if (RuleConstant.LIMIT_APP_OTHER.equals(limitApp)
            && FlowRuleManager.isOtherOrigin(origin, rule.getResource())) {
            if (strategy == RuleConstant.STRATEGY_DIRECT) {
                 //(5)
                return context.getOriginNode();
            }
            //(6)
            return selectReferenceNode(rule, context, node);
        }
        return null;
}

如果当前限流规则的 limitApp 为 default,则说明该限流规则对任何调用来源都生效,针对所有调用来源限流,否则只针对指定调用来源限流。

\1. 如果调用来源与当前限流规则的 limitApp 相等,且 strategy 为 STRATEGY_DIRECT,则使用调用来源的 StatisticNode,实现针对调用来源限流。例如,当前服务名称为 demo-srv-b,请求调用来源为 demo-srv-a 服务,资源名称为“/hello”,那么 origin 的 StatisticNode 用于针对访问来源为 demo-srv-a 的“/hello”资源的指标数据统计。

\2. 前置条件与(1)相同,依然是针对来源限流。

  • strategy 为 STRATEGY_RELATE:根据限流规则配置的 refResource 获取引用资源的 ClusterNode,即使用引用资源的指标数据限流。通俗点说就是使用其它资源的指标数据限流,你的并发量高我就限流,让你多处理一点请求,等你并发量降低了,我就不限流了;
  • strategy 为 STRATEGY_CHAIN:使用当前资源的 DefauleNode,实现按调用链路的资源指标数据限流。

\3. 当 limitApp 为 default 时,针对所有来源限流。如果 strategy 为 STRATEGY_DIRECT,则使用当前资源的 ClusterNode。

\4. 前置条件与(3)相同,依然是针对所有来源限流。

  • strategy 为 STRATEGY_RELATE:使用引用资源的 ClusterNode;
  • strategy 为 STRATEGY_CHAIN:使用当前资源的 DefauleNode。

\5. 如果 limitApp 为 other,且该资源的所有限流规则都没有针对当前的调用来源限流。如果 strategy 为 STRATEGY_DIRECT,则使用 origin 的 StatisticNode。

\6. 前置条件与(5)一样。

  • strategy 为 STRATEGY_RELATE:使用引用资源的 ClusterNode;
  • strategy 为 STRATEGY_CHAIN:使用当前资源的 DefauleNode。

从 selectNodeByRequesterAndStrategy 方法可以看出,Sentinel 之所以针对每个资源统计访问来源的指标数据,也是为了实现对丰富的限流策略的支持。

因为每个调用来源服务对同一个资源的访问频率都是不同的,针对调用来源限流可限制并发量较高的来源服务的请求,而对并发量低的来源服务的请求可不限流,或者是对一些并没有那么重要的来源服务限流。

当两个资源之间具有资源争抢关系的时候,使用 STRATEGY_RELATE 调用关系限流策略可避免多个资源之间过度的对同一资源争抢。例如查询订单信息和用户下单两个分别读和写数据库订单表的资源,如下图所示。

10-02-基于调用关系限流

我们可以给执行读表操作的资源设置限流规则实现写优先的目的,查询订单信息的资源根据用户下单的资源的实时指标数据限流,当写表操作过于频繁时,读表操作的请求就会被限流。