14 黑白名单限流与热点参数限流

黑白名单限流

黑白名单过滤是使用最为广泛的一种过滤规则,例如,用于实现接口安全的 IP 黑白名单规则过滤,用于防骚扰的短信、来电拦截黑白名单过滤。所以 Sentinel 中的黑白名单限流并不难理解,如果配置了黑名单,且请求来源存在黑名单中,则拦截(拒绝)请求,如果配置了白名单,且请求来源存在白名单中则放行。Sentinel 不支持一个黑白名单规则同时配置黑名单和白名单,因此不存优先级的问题。

黑白名单过滤功能更像是一种授权机制,它简单的将权限分为有权限和无权限两种情况,如果支持冲突,可使用优先级策略解决冲突问题。Sentinel 把黑白名作为授权策略,实现黑白名单限流即实现授权限流。Sentinel 在命名上也是使用 Authority,而非 BlackWhiteList。

一些关键类说明:

  • AuthoritySlot:实现黑白名称授权功能的切入点(ProcessorSlot)
  • AuthorityRule:授权规则类
  • AuthorityRuleChecker:授权检测类
  • AuthorityRuleManager:授权规则管理者,提供 loadRuls API
  • AuthorityException:授权检测异常,继承 BlockException

AuthorityRule

授权规则(AuthorityRule)是 Sentinel 中最易于理解的一种规则,AuthorityRule 的配置项如下:

public class AuthorityRule extends AbstractRule {
    private int strategy = RuleConstant.AUTHORITY_WHITE;
}

  • resource:资源名称,从父类继承而来。
  • limitApp:限制的来源名称,在 AuthorityRule 中可配置多个,使用‘,’号分隔。
  • strategy:限流策略,白名单:AUTHORITY_WHITE,黑名单:AUTHORITY_BLACK。

当 strategy 配置为 AUTHORITY_WHITE 时,limitApp 即为白名单;当 strategy 配置为 AUTHORITY_BLACK 时,limitApp 即为黑明单。例如:

AuthorityRule rule = new AuthorityRule();
// 资源名称
rule.setResource("GET:/hello");
// 白名单策略
rule.setStrategy(RuleConstant.AUTHORITY_WHITE);
// 白名单
rule.setLimitApp("serviceA,serviceC");
AuthorityRuleManager.loadRules(Collections.singletonList(rule));

上述规则用于限制资源 “GET:/hello” 只允许服务 A 和服务 C 访问。

AuthoritySlot

在使用默认的 SlotChainBuilder 情况下,AuthoritySlot 被放在 SystemSlot、FlowSlot、DegradeSlot 的前面,其优先级更高。

原因之一是授权限流不需要使用统计的指标数据,另一个原因则是提升性能,在未授权的情况下没必要判断是否需要熔断、系统负载能否接住这个请求、QPS 是否过高等,这与用户授权功能是一样的道理,未登陆无需判断是否有权限访问某个资源。

AuthoritySlot 的实现源码如下:

public class AuthoritySlot extends AbstractLinkedProcessorSlot<DefaultNode> {

    @Override
    public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count, boolean prioritized, Object... args)
        throws Throwable {
        checkBlackWhiteAuthority(resourceWrapper, context);
        fireEntry(context, resourceWrapper, node, count, prioritized, args);
    }

    @Override
    public void exit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) {
        fireExit(context, resourceWrapper, count, args);
    }

    void checkBlackWhiteAuthority(ResourceWrapper resource, Context context) throws AuthorityException {
        // (1)
        Map<String, Set<AuthorityRule>> authorityRules = AuthorityRuleManager.getAuthorityRules();
        if (authorityRules == null) {
            return;
        }
        // (2)
        Set<AuthorityRule> rules = authorityRules.get(resource.getName());
        if (rules == null) {
            return;
        }
        // (3)
        for (AuthorityRule rule : rules) {
            if (!AuthorityRuleChecker.passCheck(rule, context)) {
                throw new AuthorityException(context.getOrigin(), rule);
            }
        }
    }
}

  • 从 AuthorityRuleManager 获取当前配置的所有授权规则;
  • 获取为当前资源配置的所有授权规则;
  • 遍历授权规则,调用 AuthorityRuleChecker#passCheck 方法判断是否拒绝当前请求,是则抛出 AuthorityException 异常。

AuthorityRuleChecker

AuthorityRuleChecker 负责实现黑白名单的过滤逻辑,其 passCheck 方法源码如下:

    static boolean passCheck(AuthorityRule rule, Context context) {
        // 获取来源
        String requester = context.getOrigin();
        // 来源为空,或者来源等于规则配置的 limitApp 则拦截请求
        if (StringUtil.isEmpty(requester) || StringUtil.isEmpty(rule.getLimitApp())) {
            return true;
        }
        // 字符串查找,这一步起到快速过滤的作用,提升性能
        int pos = rule.getLimitApp().indexOf(requester);
        boolean contain = pos > -1;
        // 存在才精确匹配
        if (contain) {
            boolean exactlyMatch = false;
            // 分隔数组
            String[] appArray = rule.getLimitApp().split(",");
            for (String app : appArray) {
                if (requester.equals(app)) {
                    // 标志设置为 true
                    exactlyMatch = true;
                    break;
                }
            }
            contain = exactlyMatch;
        }
        // 策略
        int strategy = rule.getStrategy();
        // 如果是黑名单,且来源存在规则配置的黑名单中
        if (strategy == RuleConstant.AUTHORITY_BLACK && contain) {
            return false;
        }
        // 如果是白名单,且来源不存在规则配置的白名单中
        if (strategy == RuleConstant.AUTHORITY_WHITE && !contain) {
            return false;
        }
        return true;
    }

整个方法都比较简单,首先是从当前 Context 获取调用来源的名称,只有在调用来源不为空且规则配置了黑名单或者白名单的情况下,才会走黑白名单的过滤逻辑,这也说明,要实现黑白名单限流的前提是,每个服务消费端在发起请求时都必须要携带自身服务的名称,这取决于 Sentinel 主流框架适配器;其次,Sentinel 通过使用 indexOf 先简单匹配一次黑名单或白名单,再切割黑名单或白名单数组实现精确匹配,这有助于提升性能;如果当前请求来源存在名单中,则根据策略判断这份名称是黑名单还是白名单,再决定是否需要拒绝请求。

热点参数限流

热点参数限流并非在 Sentinel 的 core 模块中实现的,但也是非常实用的一种限流方式。并且,Sentinel 支持 API Gateway 网关限流也是基于参数限流实现的,了解热点参数限流的实现原理,也有助于我们更好地理解网关限流。

参数限流,即根据方法调用传递的参数实现限流,又或者说是根据接口的请求参数限流;热点参数限流,即针对访问频繁的参数限流。

例如,都是调用一个下单接口,但购买的商品不同,比如主播带货的商品下单流量较大,而一般商品购买量很少,同时因为商品数量有限,不太可能每个下单请求都能购买成功,如果能实现根据客户端请求传递的商品 ID 实现限流,将流量控制在商品的库存总量左右,并且使用 QPS 限流等兜底,这种有针对性的限流将接口通过的有效流量最大化。

热点参数限流功能在 Sentinel 源码的扩展功能模块为 sentinel-extension,子模块为 sentinel-parameter-flow-control。

基于滑动窗口的热点参数指标数据统计

热点参数限流使用的指标数据不再是 core 模块中统计的指标数据,而是重新实现了一套指标数据统计功能,依旧是基于滑动窗口。

  • ParamMapBucket:实现参数指标数据统计的 Bucket,用于统计某个参数对应不同取值的被限流总数、被放行的总数。
  • HotParameterLeapArray:实现滑动窗口,持有 WindowWrap 数组,WindowWrap 包装 ParamMapBucket。

与 core 模块的 MetricBucket 实现不同,MetricBucket 只统计每个指标的数值,而 ParamMapBucket 需要统计每个指标、参数的每种取值的数值,MetricBucket 更像是 Redis 中的 String 结构,而 ParamMapBucket 更像 Redis 中的 Hash 结构。

ParamMapBucket 的源码如下:

public class ParamMapBucket {

    // 数组类型为 CacheMap<Object, AtomicInteger>
    private final CacheMap<Object, AtomicInteger>[] data;

    public ParamMapBucket() {
        this(DEFAULT_MAX_CAPACITY);
    }

    public ParamMapBucket(int capacity) {
        RollingParamEvent[] events = RollingParamEvent.values();
        // 根据需要统计的指标数据创建数组
        this.data = new CacheMap[events.length];
        // RollingParamEvent 可取值为 REQUEST_PASSED、REQUEST_BLOCKED
        for (RollingParamEvent event : events) {
            data[event.ordinal()] = new ConcurrentLinkedHashMapWrapper<Object, AtomicInteger>(capacity);
        }
    }

}

  • data:数组元素类型为 CacheMap<Object, AtomicInteger>,下标为 0 存储的是统计请求通过的指标数据,下标为 1 统计的是请求被拒绝的指标数据。
  • CacheMap<Object, AtomicInteger>:key 为参数的取值,例如商品的 ID,value 才是指标数值。

HotParameterLeapArray 继承 LeapArray,即实现滑动窗口。ParamMapBucket 不存储窗口时间信息,窗口时间信息依然由 WindowWrap 存储,HotParameterLeapArray 使用 WindowWrap 包装 ParamMapBucket。

笔者也是看了 HotParameterLeapArray 之后才明白为什么 Sentienl 将滑动窗口抽象为 LeapArray,这为扩展实现收集自定义指标数据的滑动窗口提供了支持。

HotParameterLeapArray 的提供的几个 API 如下:

public class HotParameterLeapArray extends LeapArray<ParamMapBucket> {
   //.....
    public void addValue(RollingParamEvent event, int count, Object value) {
        // ....
    }

    public Map<Object, Double> getTopValues(RollingParamEvent event, int number) {
       // .....
    }

    public long getRollingSum(RollingParamEvent event, Object value) {
        // .....
    }

    public double getRollingAvg(RollingParamEvent event, Object value) {
        // ....
    }
}

  • addValue:添加参数的指标数值,例如,给 REQUEST_PASSED 指标且参数取值为 4343433 的指标数值加上 count,假设这个滑动窗口是用于统计商品 ID 参数的,4343433 表示商品 ID,count 为 1,调用该方法表示给商品 ID 为 4343433 的请求通过总数加 1。
  • getTopValues:获取热点参数的 QPS,即获取某个指标排名前 number 的参数取值与指标数据。例如,获取 REQUEST_PASSED 指标排名前 10 的 QPS,方法返回值类型为 Map,key 为参数的取值,value 为 QPS。
  • getRollingSum:计算某个指标、参数的某个取值的总请求数。例如,获取 REQUEST_PASSED 且商品 ID 为 4343433 的请求总数。
  • getRollingAvg:获取某个指标、参数的某个取值的平均 QPS。例如,获取 REQUEST_PASSED 且商品 ID 为 4343433 的平均 QPS。

可见,如果是分钟级的滑动窗口,一分内参数的取值越多,其占用的内存就越多。

参数限流中的 Node

两个需要重点关注的类:

  • ParameterMetric:用于实现类似 ClusterNode 的统计功能。
  • ParameterMetricStorage:用于实现类似 EntranceNode 功能,管理和存储每个资源对应的 ParameterMetric。

ParameterMetric 有三个静态字段,源码如下:

public class ParameterMetric {

    private final Map<ParamFlowRule, CacheMap<Object, AtomicLong>> ruleTimeCounters = new HashMap<>();
    private final Map<ParamFlowRule, CacheMap<Object, AtomicLong>> ruleTokenCounter = new HashMap<>();
    private final Map<Integer, CacheMap<Object, AtomicInteger>> threadCountMap = new HashMap<>();

}

  • ruleTimeCounters:用于实现匀速流量控制效果,key 为参数限流规则(ParamFlowRule),值为参数不同取值对应的上次生产令牌的时间。
  • ruleTokenCounter:用于实现匀速流量控制效果,key 为参数限流规则(ParamFlowRule),值为参数不同取值对应的当前令牌桶中的令牌数。
  • threadCountMap:key 为参数索引,值为参数不同取值对应的当前并行占用的线程总数。

ParameterMetricStorage 使用 ConcurrentHashMap 缓存每个资源对应的 ParameterMetric,只会为配置了参数限流规则的资源创建 ParameterMetric。其部份源码如下所示:

public final class ParameterMetricStorage {
    private static final Map<String, ParameterMetric> metricsMap = new ConcurrentHashMap<>();
    private static final Object LOCK = new Object();

    public static void initParamMetricsFor(ResourceWrapper resourceWrapper,ParamFlowRule rule) {
        if (resourceWrapper == null || resourceWrapper.getName() == null) {
            return;
        }
        String resourceName = resourceWrapper.getName();
        ParameterMetric metric;
        // 双重检测,线程安全,为资源创建全局唯一的 ParameterMetric
        if ((metric = metricsMap.get(resourceName)) == null) {
            synchronized (LOCK) {
                if ((metric = metricsMap.get(resourceName)) == null) {
                    metric = new ParameterMetric();
                    metricsMap.put(resourceWrapper.getName(), metric);
                }
            }
        }
        // 初始化 ParameterMetric
        metric.initialize(rule);
    }
}

initParamMetricsFor 方法用于为资源创建 ParameterMetric 并初始化,该方法在资源被访问时由 ParamFlowSlot 调用,并且该方法只在为资源配置了参数限流规则的情况下被调用。

热点参数限流功能的实现

sentinel-parameter-flow-control 模块通过 Java SPI 注册自定义的 SlotChainBuilder,即注册 HotParamSlotChainBuilder,将 ParamFlowSlot 放置在 StatisticSlot 的后面,这个 ParamFlowSlot 就是实现热点参数限流功能的切入点。

public class ParamFlowSlot extends AbstractLinkedProcessorSlot<DefaultNode> {

    @Override
    public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
                      boolean prioritized, Object... args) throws Throwable {
        if (!ParamFlowRuleManager.hasRules(resourceWrapper.getName())) {
            fireEntry(context, resourceWrapper, node, count, prioritized, args);
            return;
        }
        checkFlow(resourceWrapper, count, args);
        fireEntry(context, resourceWrapper, node, count, prioritized, args);
    }

    @Override
    public void exit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) {
        fireExit(context, resourceWrapper, count, args);
    }
}

既然是参数限流,那么肯定是需要能够拿到参数了,而 ProcessorSlot#entry 方法的最后一个参数就是请求传递过来的参数,通过 SphU#entry 方法一层层往下传递。例如:

    @GetMapping("/hello")
    public String apiHello(String name) throws BlockException {
        ContextUtil.enter("my_context");
        Entry entry = null;
        try {
            entry = SphU.entry("GET:/hello", EntryType.IN,1,name);
            doBusiness();
            return "Hello!";
        } catch (Exception e) {
            if (!(e instanceof BlockException)) {
                Tracer.trace(e);
            }
            throw e;
        } finally {
            if (entry != null) {
                entry.exit(1);
            }
            ContextUtil.exit();
        }
    }

当 SphU#entry 调用到 ParamFlowSlot#entry 方法时,ParamFlowSlot 调用 checkFlow 方法判断是否需要限流。checkFlow 方法的实现如下:

   void checkFlow(ResourceWrapper resourceWrapper, int count, Object... args) throws BlockException {
        //(1)
        if (args == null) {
            return;
        }
        if (!ParamFlowRuleManager.hasRules(resourceWrapper.getName())) {
            return;
        }
        List<ParamFlowRule> rules = ParamFlowRuleManager.getRulesOfResource(resourceWrapper.getName());
        //(2)
        for (ParamFlowRule rule : rules) {
            applyRealParamIdx(rule, args.length);

            // Initialize the parameter metrics.
            ParameterMetricStorage.initParamMetricsFor(resourceWrapper, rule);

            if (!ParamFlowChecker.passCheck(resourceWrapper, rule, count, args)) {
                String triggeredParam = "";
                if (args.length > rule.getParamIdx()) {
                    Object value = args[rule.getParamIdx()];
                    triggeredParam = String.valueOf(value);
                }
                throw new ParamFlowException(resourceWrapper.getName(), triggeredParam, rule);
            }
        }
    }

  • checkFlow 方法的最后一个参数是请求参数,也就是调用 SphU#entry 方法传递进来的参数。
  • checkFlow 方法首先调用 ParamFlowRuleManager 的 API 判断当前资源有没有配置参数限流规则,如果有,则获取为当前资源配置的所有参数限流规则。
  • 遍历参数限流规则,调用 ParameterMetricStorage#initParamMetricsFor 方法判断是否需要为当前资源初始化创建 ParameterMetric,然后调用 ParamFlowChecker#passCheck 方法判断当前请求是否可以放行,如果需要拒绝请求,则抛出 ParamFlowException 异常。

在阅读 ParamFlowChecker#passCheck 方法的源码之前,我们需要先了解参数限流规则的配置,了解每个配置项的作用。

参数限流规则 ParamFlowRule 的源码如下(有删减):

public class ParamFlowRule extends AbstractRule {
    private int grade = RuleConstant.FLOW_GRADE_QPS;
    private double count;
    private Integer paramIdx;
    private int controlBehavior = RuleConstant.CONTROL_BEHAVIOR_DEFAULT;
    private int maxQueueingTimeMs = 0;
    private long durationInSec = 1;
    private int burstCount = 0;
}

  • grade:限流规则的阈值类型,支持的类型同 FlowRule。
  • count:阈值,同 FlowRule。
  • paramIdx:参数索引,ParamFlowChecker 根据限流规则的参数索引获取参数的值,下标从 0 开始,例如方法 public String apiHello(String name),该方法只有一个参数,索引为 0 对应 name 参数。
  • controlBehavior:流量控制效果,同 FlowRule,但只支持快速失败和匀速排队。
  • maxQueueingTimeMs:实现匀速排队流量控制效果的虚拟队列最大等待时间,超过该值的请求被抛弃,同 FlowRule;
  • durationInSec:统计指标数据的窗口时间大小,单位为秒。
  • burstCount:支持的突发流量总数。

假设需要针对资源“GET:/hello”的 name 参数限流,当 name 取值为“jackson”时限流 QPS 阈值为 5,则配置如下:

ParamFlowRule rule = new ParamFlowRule();
// 资源为/hello
rule.setResource("GET:/hello");
// 索引 0 对应的参数为 name
rule.setParamIdx(0); 
// qps 限流
rule.setGrade(RuleConstant.FLOW_GRADE_QPS);
// 阈值为 5
rule.setCount(5);
ParamFlowRuleManager.loadRules(Collections.singletonList(rule));

以此为例,我们分析 ParamFlowChecker#passCheck 方法源码,passCheck 返回 true 表示放行,返回 false 表示拒绝。

ParamFlowChecker#passCheck 方法源码如下:

    public static boolean passCheck(ResourceWrapper resourceWrapper, ParamFlowRule rule, int count,
                             Object... args) {
        if (args == null) {
            return true;
        }
        // 判断参数索引是否合法
        int paramIdx = rule.getParamIdx();
        if (args.length <= paramIdx) {
            return true;
        }
        // 获取参数值,如果值为空则允许通过
        Object value = args[paramIdx];
        if (value == null) {
            return true;
        }
        // 集群限流
        if (rule.isClusterMode() && rule.getGrade() == RuleConstant.FLOW_GRADE_QPS) {
            return passClusterCheck(resourceWrapper, rule, count, value);
        }
        // 单机限流
        return passLocalCheck(resourceWrapper, rule, count, value);
    }

  • 如果参数为空、或者参数的总数小于等于规则配置的参数索引值、或者参数索引对应的参数的值为空,则放行请求;
  • 如果是集群限流模式,则调用 passClusterCheck 方法,否则调用 passLocalCheck 方法。

我们先不讨论集群限流情况,仅看单机本地限流情况。passLocalCheck 方法的源码如下:

  private static boolean passLocalCheck(ResourceWrapper resourceWrapper, ParamFlowRule rule, int count,
                                          Object value) {
        try {
            // 基本数据类型
            if (Collection.class.isAssignableFrom(value.getClass())) {
                for (Object param : ((Collection)value)) {
                    if (!passSingleValueCheck(resourceWrapper, rule, count, param)) {
                        return false;
                    }
                }
            }
            // 数组类
            else if (value.getClass().isArray()) {
                int length = Array.getLength(value);
                for (int i = 0; i < length; i++) {
                    Object param = Array.get(value, i);
                    if (!passSingleValueCheck(resourceWrapper, rule, count, param)) {
                        return false;
                    }
                }
            }
            // 引用类型
            else {
                return passSingleValueCheck(resourceWrapper, rule, count, value);
            }
        } catch (Throwable e) {
        }
        return true;
    }

由于参数可能是基本数据类型,也可能是数组类型,或者引用类型,所以 passLocalCheck 方法分三种情况处理。我们只讨论其中一种情况,其它情况的实现类似。

以资源“GET:/hello”为例,其方法 apiHello 的 name 参数为 String 类型,因此会调用 passSingleValueCheck 方法,该方法源码如下:

  static boolean passSingleValueCheck(ResourceWrapper resourceWrapper, ParamFlowRule rule, int acquireCount,Object value) {
       //(1) 
       if (rule.getGrade() == RuleConstant.FLOW_GRADE_QPS) {
            if (rule.getControlBehavior() == RuleConstant.CONTROL_BEHAVIOR_RATE_LIMITER) {
                return passThrottleLocalCheck(resourceWrapper, rule, acquireCount, value);
            } else {
                return passDefaultLocalCheck(resourceWrapper, rule, acquireCount, value);
            }
        } 
        // (2)
        else if (rule.getGrade() == RuleConstant.FLOW_GRADE_THREAD) {
            Set<Object> exclusionItems = rule.getParsedHotItems().keySet();
            long threadCount = getParameterMetric(resourceWrapper).getThreadCount(rule.getParamIdx(), value);
            if (exclusionItems.contains(value)) {
                int itemThreshold = rule.getParsedHotItems().get(value);
                return ++threadCount <= itemThreshold;
            }
            long threshold = (long)rule.getCount();
            return ++threadCount <= threshold;
        }

        return true;
    }

  • 当规则配置的阈值类型为 QPS 时,根据流控效果调用 passThrottleLocalCheck 或 passDefaultLocalCheck 方法;
  • 当规则配置的阈值类型为 THREAD 时,获取当前资源的 ParameterMetric,从而取得当前资源、当前参数的值对应的并行占用的线程总数,如果并行占用的线程总数+1 大于限流阈值则限流,否则放行。

你可能好奇,并行占用线程总数是在哪里自增和自减的呢?

这是由 ParamFlowStatisticEntryCallback 与 ParamFlowStatisticExitCallback 这两个 Callback 实现的,分别在 StatisticSlot 的 entry 方法和 exit 方法中被回调执行,这是我们前面分析 StatisticSlot 源码时故意遗漏的细节。

快速失败(直接拒绝)与匀速排队

1. 快速失败

快速失败基于令牌桶算法实现。passDefaultLocalCheck 方法控制每个时间窗口只生产一次令牌,将令牌放入令牌桶,每个请求都从令牌桶中取走令牌,当令牌足够时放行,当令牌不足时直接拒绝。ParameterMetric#tokenCounters 用作令牌桶,timeCounters 存储最近一次生产令牌的时间。

passDefaultLocalCheck 方法源码如下:

static boolean passDefaultLocalCheck(ResourceWrapper resourceWrapper, ParamFlowRule rule, int acquireCount,
                                         Object value) {
        //(1)
        ParameterMetric metric = getParameterMetric(resourceWrapper);
        CacheMap<Object, AtomicLong> tokenCounters = metric == null ? null : metric.getRuleTokenCounter(rule);
        CacheMap<Object, AtomicLong> timeCounters = metric == null ? null : metric.getRuleTimeCounter(rule);
        if (tokenCounters == null || timeCounters == null) {
            return true;
        }
        // (2)
        Set<Object> exclusionItems = rule.getParsedHotItems().keySet();
        long tokenCount = (long)rule.getCount();
        if (exclusionItems.contains(value)) {
            tokenCount = rule.getParsedHotItems().get(value);
        }
        if (tokenCount == 0) {
            return false;
        }
        // (3)
        long maxCount = tokenCount + rule.getBurstCount();
        if (acquireCount > maxCount) {
            return false;
        }
        while (true) {
            // (4)
            long currentTime = TimeUtil.currentTimeMillis();
            AtomicLong lastAddTokenTime = timeCounters.putIfAbsent(value, new AtomicLong(currentTime));
            if (lastAddTokenTime == null) {
                tokenCounters.putIfAbsent(value, new AtomicLong(maxCount - acquireCount));
                return true;
            }

            //(5)
            long passTime = currentTime - lastAddTokenTime.get();
            if (passTime > rule.getDurationInSec() * 1000) {
                // 确保非 NULL
                AtomicLong oldQps = tokenCounters.putIfAbsent(value, new AtomicLong(maxCount - acquireCount));
                if (oldQps == null) {
                    lastAddTokenTime.set(currentTime);
                    return true;
                } else {
                     //(6)
                    long restQps = oldQps.get();
                    // 计算需要新增的令牌数,根据时间间隔、限流阈值、窗口时间计算
                    long toAddCount = (passTime * tokenCount) / (rule.getDurationInSec() * 1000);
                    // 计算新的令牌总数,并立即使用(扣减 acquireCount 个令牌)
                    long newQps = toAddCount + restQps > maxCount ? (maxCount - acquireCount)
                        : (restQps + toAddCount - acquireCount);

                    if (newQps < 0) {
                        return false;
                    }
                    if (oldQps.compareAndSet(restQps, newQps)) {
                        lastAddTokenTime.set(currentTime);
                        return true;
                    }
                    Thread.yield();
                }
            } else {
                // (7)
                AtomicLong oldQps = tokenCounters.get(value);
                if (oldQps != null) {
                    long oldQpsValue = oldQps.get();
                    // 令牌是否足够
                    if (oldQpsValue - acquireCount >= 0) {
                        // 从令牌桶中取走令牌
                        if (oldQps.compareAndSet(oldQpsValue, oldQpsValue - acquireCount)) {
                            return true;
                        }
                    } else {
                        return false;
                    }
                }
                Thread.yield();
            }
        }
    }

  1. 根据资源获取 ParameterMetric,从 ParameterMetric 获取当前限流规则的令牌桶和最近一次生产令牌的时间,时间精确到毫秒。
  2. 计算限流阈值,即令牌桶最大存放的令牌总数(tokenCount)。
  3. 重新计算限流阈值,将当前限流阈值加上允许突增流量的数量。
  4. 获取当前时间,如果当前参数值未生产过令牌,则初始化生产令牌,并立即使用(maxCount - acquireCount)。
  5. 获取当前时间与上次生产令牌的时间间隔,如果间隔时间大于一个窗口时间见(6),否则见(7)。
  6. 计算需要生产的令牌总数,并与当前桶中剩余的令牌数相加得到新的令牌总数,如果新的令牌总数大于限流阈值,则使用限流阈值作为新的令牌总数,并且生产完成立即使用(maxCount - acquireCount),最后更新最近一次生产令牌的时间。
  7. 从令牌桶中获取令牌,如果获取成功(oldQpsValue - acquireCount >= 0),则放行当前请求,否则拒绝当前请求。

2. 匀速排队

与 RateLimiterController 实现原理一样,passThrottleLocalCheck 方法让请求在虚拟队列中排队,控制请求通过的时间间隔,该时间间隔通过阈值与窗口时间大小计算出来,如果当前请求计算出来的排队等待时间大于限流规则指定的 maxQueueingTimeMs,则拒绝当前请求。

passThrottleLocalCheck 方法源码如下:

  static boolean passThrottleLocalCheck(ResourceWrapper resourceWrapper, ParamFlowRule rule, int acquireCount,Object value) {
        //(1)
        ParameterMetric metric = getParameterMetric(resourceWrapper);
        CacheMap<Object, AtomicLong> timeRecorderMap = metric == null ? null : metric.getRuleTimeCounter(rule);
        if (timeRecorderMap == null) {
            return true;
        }
        // (2)
        Set<Object> exclusionItems = rule.getParsedHotItems().keySet();
        long tokenCount = (long)rule.getCount();
        if (exclusionItems.contains(value)) {
            tokenCount = rule.getParsedHotItems().get(value);
        }
        if (tokenCount == 0) {
            return false;
        }
        //(3)
        long costTime = Math.round(1.0 * 1000 * acquireCount * rule.getDurationInSec() / tokenCount);
        while (true) {
            //(4)
            long currentTime = TimeUtil.currentTimeMillis();
            AtomicLong timeRecorder = timeRecorderMap.putIfAbsent(value, new AtomicLong(currentTime));
            if (timeRecorder == null) {
                return true;
            }
            long lastPassTime = timeRecorder.get();
            // 计算当前请求的期望通过时间,最近一次请求的期望通过时间 + 请求通过的时间间隔
            long expectedTime = lastPassTime + costTime;
            //(5)
            if (expectedTime <= currentTime 
                 || expectedTime - currentTime < rule.getMaxQueueingTimeMs()) {
                AtomicLong lastPastTimeRef = timeRecorderMap.get(value);
                if (lastPastTimeRef.compareAndSet(lastPassTime, currentTime)) {
                    long waitTime = expectedTime - currentTime;
                    if (waitTime > 0) {
                        lastPastTimeRef.set(expectedTime);
                        try {
                            TimeUnit.MILLISECONDS.sleep(waitTime);
                        } catch (InterruptedException e) {
                            RecordLog.warn("passThrottleLocalCheck: wait interrupted", e);
                        }
                    }
                    return true;
                } else {
                    Thread.yield();
                }
            } else {
                return false;
            }
        }
    }

  1. 当流控效果选择匀速限流时,ParameterMetric 的 ruleTimeCounters 不再是记录上次生产令牌的时间,而是记录最后一个请求的期望通过时间。
  2. 计算限流阈值,不支持突增流量。
  3. 计算请求通过的时间间隔,例如,当 acquireCount 等于 1、限流阈值配置为 200 且窗口时间大小为 1 秒时,计算出来的 costTime 等于 5ms,即每 5ms 只允许通过一个请求。
  4. 计算当前请求的期望通过时间,值为最近一次请求的期望通过时间 + 请求通过的时间间隔,最近一次请求的期望通过时间也就是虚拟队列中队列尾部的那个请求的期望通过时间。
  5. 如果期望通过时间与当前时间间隔大于规则配置的允许队列最大等待时间(maxQueueingTimeMs),则拒绝当前请求,否则将当前请求“放入”虚拟队列等待,计算出当前请求需要等待的时间,让当前线程休眠指定时长之后再放行该请求。

总结

黑白名单限流的实现相对简单,热点参数限流的实现相对复杂。热点参数限流自己实现了一个滑动窗口用于收集指标数据,但该滑动窗口并未被使用,而是使用 ParameterMetric 与 ParameterMetricStorage,这应该是出于性能的考虑。热点参数限流对性能的影响和对内存的占用与参数的取值有多少种可能成正比,限流参数的取值可能性越多,占用的内存就越大,对性能的影响也就越大,在使用热点参数限流功能时,一定要考虑参数的取值。

例如,根据商品 ID 限流,如果有十万个商品下单,那么 CacheMap 将会存在十万个 key-value,并且不会被移除,随着进程运行的时长而增长。如果限流阈值类型选择为 THREAD 则不会存在这个问题,因为在 ParamFlowStatisticExitCallback 方法会调用 ParameterMetric#decreaseThreadCount 方法扣减参数值占用的线程数,当线程数为零时,会将当前参数值对应的 key-value 从 CacheMap 中移除。