跳至主要內容

sentinel限流熔断的应用及原理

soulballad分布式SentinelSentinel约 2725 字大约 9 分钟

常见的限流方式

semphore

Semaphore 是 jdk 提供的方式。Semaphore(信号量),用于做限流处理,比如说同时只允许5五个人访问,超过五个人访问就需要等待,类似这样的需求,下面的案例可以看出是五个五个的执行,等上一个五个执行完了,才会执行下一个

public class SemphoreDemo {

    public static void main(String[] args) {
        // 线程池
        ExecutorService exec = Executors.newCachedThreadPool();
        // 只能5个线程同时访问
        final Semaphore semp = new Semaphore(5);
        // 模拟20个客户端访问
        for (int index = 0; index < 20; index++) {
            final int NO = index;

            exec.execute(() -> {
                // 获取许可
                try {
                    semp.acquire();
                    System.out.println("Accessing: " + NO);
                    //模拟实际业务逻辑
                    Thread.sleep((long) (Math.random() * 10000));
                    // 访问完后,释放
                    semp.release();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });
        }

        try {
            Thread.sleep(10);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        // 退出线程池
        exec.shutdown();
    }
}

RateLimiter

Guava(RateLimiter) -> 令牌桶/漏桶

<dependency>
    <groupId>com.google.guava</groupId>
    <artifactId>guava</artifactId>
    <version>20.0</version>
</dependency>
public class RateLimiterDemo {

   static RateLimiter rateLimiter = RateLimiter.create(5);

   private static void getAcquire(int index) {
       if (rateLimiter.tryAcquire()) {
           System.out.println(index + "允许进行访问!");
       }else{
           System.out.println(index + "被限流了!");
       }
   }

   public static void main(String[] args) {

       CountDownLatch latch = new CountDownLatch(1);
       Random random = new Random();
       for (int i = 0; i < 20; i++) {
           final int index = i;
           new Thread(()->{
               try {
                   latch.await();
                   Thread.sleep(random.nextInt(1000));
                   getAcquire(index);
               } catch (InterruptedException e) {
                   e.printStackTrace();
               }
           }).start();
       }

       latch.countDown();
   }
}

RRateLimiter

Redisson(RRateLimiter)(令牌桶)

redis 的分布式限流方式 oschinaopen in new window

<dependency>
    <groupId>org.redisson</groupId>
    <artifactId>redisson</artifactId>
    <version>3.4.1</version>
</dependency>
@Slf4j
public class RedissonDemo {

    private final RedissonClient redisClient;
    private final String key = "msgRateLimiter:" + "test";
    private final int limiter = 10000;

    @Autowired
    public RedissonDemo(RedissonClient redisClient) {
        this.redisClient = redisClient;
    }

    //服务启动的时候,先清一下 redis,防止 count 出错
    public void reload() {
        RMapCache<String, Integer> msgRateLimit = 
            redisClient.getMapCache(key, IntegerCodec.INSTANCE);
        if (msgRateLimit.containsKey(key)) {
            msgRateLimit.delete();
        }
    }

    //该方法可以配合 mq,结果是 true 的话就 ack,false 的话就 reject
    public boolean handleMessage() {
        //分布式场景下的限流
        //String key = "msgRateLimiter:" + MsgConstants.MsgType.APP_PUSH[0];
        RMapCache<String, Integer> msgRateLimit =
            redisClient.getMapCache(key, IntegerCodec.INSTANCE);
        Integer count;
        try {
            msgRateLimit.putIfAbsent(key, 0, 1L, TimeUnit.SECONDS);
            count = msgRateLimit.addAndGet(key, 1);
            log.info("get redis counter:{}", count);
            if (count < limiter) {
                //此处是你要执行的代码
                return true;
            }
            log.warn("超过限流:{}", count);
        } catch (Exception e) {
            log.error("err", e);
        }
        return false;
    }
}

Alibaba Sentinel

什么是 Sentinel

Sentinel:分布式系统的流量防卫兵

随着微服务的流行,服务和服务之间的稳定性变得越来越重要。Sentinel 以流量为切入点,从流量控制、熔断降级、系统负载保护等多个维度保护服务的稳定性。

Sentinel 分为两个部分:

  • 核心库(Java 客户端)不依赖任何框架/库,能够运行于所有 Java 运行时环境,同时对 Dubbo / Spring Cloud 等框架也有较好的支持。
  • 控制台(Dashboard)基于 Spring Boot 开发,打包后可以直接运行,不需要额外的 Tomcat 等应用容器。

Sentinel 具有以下特征:

  • 丰富的应用场景:Sentinel 承接了阿里巴巴近 10 年的双十一大促流量的核心场景,例如秒杀(即突发流量控制在系统容量可以承受的范围)、消息削峰填谷、集群流量控制、实时熔断下游不可用应用等。
  • 完备的实时监控:Sentinel 同时提供实时的监控功能。您可以在控制台中看到接入应用的单台机器秒级数据,甚至 500 台以下规模的集群的汇总运行情况。
  • 广泛的开源生态:Sentinel 提供开箱即用的与其它开源框架/库的整合模块,例如与 Spring Cloud、Dubbo、gRPC 的整合。您只需要引入相应的依赖并进行简单的配置即可快速地接入 Sentinel。
  • 完善的 SPI 扩展点:Sentinel 提供简单易用、完善的 SPI 扩展接口。您可以通过实现扩展接口来快速地定制逻辑。例如定制规则管理、适配动态数据源等。

sentinel官网open in new window

Sentinel 特点

Sentinel 的主要特性:

1567082463804

Sentinel 的开源生态:

1567082613866

Sentinel 使用

  • 初始化限流规则
  • 根据限流规则进行限流

pom 依赖

<!--sentinel 核心类库-->
<dependency>
   <groupId>com.alibaba.csp</groupId>
   <artifactId>sentinel-core</artifactId>
   <version>1.6.3</version>
</dependency>
<!--sentinel 集成到控制台使用的http组件-->
<dependency>
   <groupId>com.alibaba.csp</groupId>
   <artifactId>sentinel-transport-simple-http</artifactId>
   <version>1.6.3</version>
</dependency>

测试类 SentinelDemo:

public class SentinelDemo {

    private static String resource = "test sentinel";

    private static void initFlowRule() {
        // 限流规则的集合
        List<FlowRule> rules = new ArrayList<>();
        FlowRule flowRule = new FlowRule();
        // 资源(方法名称、接口)
        flowRule.setResource(resource);
        // 限流阈值类型(qps、线程数)
        flowRule.setGrade(RuleConstant.FLOW_GRADE_QPS);
        // 限流10个
        flowRule.setCount(10);
        rules.add(flowRule);
        FlowRuleManager.loadRules(rules);
    }

    public static void main(String[] args) {
        // 初始化限流规则
        initFlowRule();
        
        while (true) {
            Entry entry = null;
            try {
                // 根据限流规则进行限流
                entry = SphU.entry(resource);
                System.out.println("可以进行访问");
            } catch (BlockException e) { //如果被限流了,那么会抛出这个异常
                e.printStackTrace();
            } finally {
                if (null != entry) {
                    entry.exit();// 释放
                }
            }
        }
    }
}

启动 sentinel 控制台 dashaboard,在 github 上下载 sentinel-dashboard-1.6.3.jaropen in new window

使用如下命令在命令行中启动 jar 包

# Dserver.port=8888,表示使用8888端口启动服务
# Dcsp.sentinel.dashboard.server=localhost:8888,表示把当前dashboard也加入限流监控
# Dproject.name=sentinel-dashboard,设置当前dashboard应用名称为 sentinel-dashboard
java -jar -Dserver.port=8888 -Dcsp.sentinel.dashboard.server=localhost:8888 -Dproject.name=sentinel-dashboard .\sentinel-dashboard-1.6.3.jar

1567084006884

可以通过浏览器访问,http://localhost:8888,账号/密码:sentinel/sentinel,显示如下界面:

1567084646585

然后启动 SentinelDemo,并加入限流监控,配置监控地址和应用名称,并启动

1567084280812

然后可以在 dashboard 看到 SentinelDemo 的界面

1567084707316

Sentinel限流的思考

  • 限流用了什么算法来实现的?(滑动窗口)
  • 它是怎么实现的?(责任链有关系)
  • SPI的扩展

sentinel 源码分析

调用关系

SphU.entry->Env.sph.entry->entry->entryWithPriority->lookProcessChain->chain.entry

lookProcessChain

ProcessorSlot<Object> lookProcessChain(ResourceWrapper resourceWrapper) {
    ProcessorSlotChain chain = chainMap.get(resourceWrapper);
    if (chain == null) {
        synchronized (LOCK) {
            chain = chainMap.get(resourceWrapper);
            if (chain == null) {
                // Entry size limit.
                if (chainMap.size() >= Constants.MAX_SLOT_CHAIN_SIZE) {
                    return null;
                }

                chain = SlotChainProvider.newSlotChain();
                Map<ResourceWrapper, ProcessorSlotChain> newMap = 
                    new HashMap<ResourceWrapper, ProcessorSlotChain>(
                    chainMap.size() + 1);
                newMap.putAll(chainMap);
                newMap.put(resourceWrapper, chain);
                chainMap = newMap;
            }
        }
    }
    return chain;
}

SlotChainProvider.newSlotChain

public static ProcessorSlotChain newSlotChain() {
    if (builder != null) {
        return builder.build();
    }

    resolveSlotChainBuilder();

    if (builder == null) {
        RecordLog.warn("[SlotChainProvider] Wrong state when resolving slot chain builder, using default");
        builder = new DefaultSlotChainBuilder();
    }
    return builder.build();
}

builder.build

@Override
public ProcessorSlotChain build() {
    ProcessorSlotChain chain = new DefaultProcessorSlotChain();
    chain.addLast(new NodeSelectorSlot());
    chain.addLast(new ClusterBuilderSlot());
    chain.addLast(new LogSlot());
    chain.addLast(new StatisticSlot());
    chain.addLast(new SystemSlot());
    chain.addLast(new AuthoritySlot());
    chain.addLast(new FlowSlot());
    chain.addLast(new DegradeSlot());

    return chain;
}

sentinel 架构

在 Sentinel 里面,所有的资源都对应一个资源名称(resourceName),每次资源调用都会创建一个 Entry 对象。Entry 可以通过对主流框架的适配自动创建,也可以通过注解的方式或调用 SphU API 显式创建。Entry 创建的时候,同时也会创建一系列功能插槽(slot chain),这些插槽有不同的职责,例如:

  • NodeSelectorSlot: 负责收集资源的路径,并将这些资源的调用路径,以树状结构存储起来,用于根据调用路径来限流降级;
  • ClusterBuilderSlot: 则用于存储资源的统计信息以及调用者信息,例如该资源的 RT, QPS, thread count 等等,这些信息将用作为多维度限流,降级的依据;
  • StatisticSlot: 则用于记录、统计不同纬度的 runtime 指标监控信息;
  • FlowSlot: 则用于根据预设的限流规则以及前面 slot 统计的状态,来进行流量控制;
  • AuthoritySlot: 则根据配置的黑白名单和调用来源信息,来做黑白名单控制;
  • DegradeSlot: 则通过统计信息以及预设的规则,来做熔断降级;
  • SystemSlot: 则通过系统的状态,例如 load1 等,来控制总的入口流量;

总体的框架如下:

1567167742131

Sentinel 将 SlotChainBuilder 作为 SPI 接口进行扩展,使得 Slot Chain 具备了扩展的能力。您可以自行加入自定义的 slot 并编排 slot 间的顺序,从而可以给 Sentinel 添加自定义的功能。

1567167878746

滑动窗口算法实现

在 sentinel 调用链中 StatisticSlot 负责数据的统计,所以会涉及到限流规则中设置的数量。

StatisticSlot.entry

@Override
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count, boolean prioritized, Object... args) throws Throwable {
    try {
        // Do some checking.
        fireEntry(context, resourceWrapper, node, count, prioritized, args);

        // Request passed, add thread count and pass count.
        node.increaseThreadNum();
        node.addPassRequest(count);

        if (context.getCurEntry().getOriginNode() != null) {
            // Add count for origin node.
            context.getCurEntry().getOriginNode().increaseThreadNum();
            context.getCurEntry().getOriginNode().addPassRequest(count);
        }

        if (resourceWrapper.getEntryType() == EntryType.IN) {
            // Add count for global inbound entry node for global statistics.
            Constants.ENTRY_NODE.increaseThreadNum();
            Constants.ENTRY_NODE.addPassRequest(count);
        }

        // Handle pass event with registered entry callback handlers.
        for (ProcessorSlotEntryCallback<DefaultNode> handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) {
            handler.onPass(context, resourceWrapper, node, count, args);
        }
    } catch (PriorityWaitException ex) {
        // ...
    } catch (BlockException e) {
        // Blocked, set block exception to current entry.
        // ...
    } catch (Throwable e) {
        // Unexpected error, set error to current entry.
        // ...
    }
}

node.addPassRequest

@Override
public void addPassRequest(int count) {
    super.addPassRequest(count);
    this.clusterNode.addPassRequest(count);
}

clusterNode.addPassRequest

@Override
public void addPassRequest(int count) {
    rollingCounterInSecond.addPass(count);
    rollingCounterInMinute.addPass(count);
}

rollingCounterInMinute.addPass

@Override
public void addPass(int count) {
    // 获取当前窗口
    WindowWrap<MetricBucket> wrap = data.currentWindow();
    // 计数
    wrap.value().addPass(count);
}

data.currentWindow

data.currentWindow -> data.currentWindow

public WindowWrap<T> currentWindow(long timeMillis) {
    if (timeMillis < 0) {
        return null;
    }
	// 获取当前时间所在窗口的索引
    int idx = calculateTimeIdx(timeMillis);
    // 获取当前时间所在窗口开始时间
    // Calculate current bucket start time.
    long windowStart = calculateWindowStart(timeMillis);

    /*
         * Get bucket item at given time from the array.
         *
         * (1) Bucket is absent, then just create a new bucket and CAS update to circular array.
         * (2) Bucket is up-to-date, then just return the bucket.
         * (3) Bucket is deprecated, then reset current bucket and clean all deprecated buckets.
         */
    while (true) {
        WindowWrap<T> old = array.get(idx);
        if (old == null) {
            /*
                 *     B0       B1      B2    NULL      B4
                 * ||_______|_______|_______|_______|_______||___
                 * 200     400     600     800     1000    1200  timestamp
                 *                             ^
                 *                          time=888
                 *            bucket is empty, so create new and update
                 *
                 * If the old bucket is absent, then we create a new bucket at {@code windowStart},
                 * then try to update circular array via a CAS operation. Only one thread can
                 * succeed to update, while other threads yield its time slice.
                 */
            WindowWrap<T> window = new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis));
            if (array.compareAndSet(idx, null, window)) {
                // Successfully updated, return the created bucket.
                return window;
            } else {
                // Contention failed, the thread will yield its time slice to wait for bucket available.
                Thread.yield();
            }
        } else if (windowStart == old.windowStart()) {
            /*
                 *     B0       B1      B2     B3      B4
                 * ||_______|_______|_______|_______|_______||___
                 * 200     400     600     800     1000    1200  timestamp
                 *                             ^
                 *                          time=888
                 *            startTime of Bucket 3: 800, so it's up-to-date
                 *
                 * If current {@code windowStart} is equal to the start timestamp of old bucket,
                 * that means the time is within the bucket, so directly return the bucket.
                 */
            return old;
        } else if (windowStart > old.windowStart()) {
            /*
                 *   (old)
                 *             B0       B1      B2    NULL      B4
                 * |_______||_______|_______|_______|_______|_______||___
                 * ...    1200     1400    1600    1800    2000    2200  timestamp
                 *                              ^
                 *                           time=1676
                 *          startTime of Bucket 2: 400, deprecated, should be reset
                 *
                 * If the start timestamp of old bucket is behind provided time, that means
                 * the bucket is deprecated. We have to reset the bucket to current {@code windowStart}.
                 * Note that the reset and clean-up operations are hard to be atomic,
                 * so we need a update lock to guarantee the correctness of bucket update.
                 *
                 * The update lock is conditional (tiny scope) and will take effect only when
                 * bucket is deprecated, so in most cases it won't lead to performance loss.
                 */
            if (updateLock.tryLock()) {
                try {
                    // Successfully get the update lock, now we reset the bucket.
                    return resetWindowTo(old, windowStart);
                } finally {
                    updateLock.unlock();
                }
            } else {
                // Contention failed, the thread will yield its time slice to wait for bucket available.
                Thread.yield();
            }
        } else if (windowStart < old.windowStart()) {
            // Should not go through here, as the provided time is already behind.
            return new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis));
        }
    }
}

数量计算完毕之后会调用 FlowSlot 的 entry 方法校验规则

FlowSlot#entry

@Override
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count, boolean prioritized, Object... args) throws Throwable {
    checkFlow(resourceWrapper, context, node, count, prioritized);

    fireEntry(context, resourceWrapper, node, count, prioritized, args);
}

checkFlow

public void checkFlow(Function<String, Collection<FlowRule>> ruleProvider, ResourceWrapper resource, Context context, DefaultNode node, int count, boolean prioritized) throws BlockException {
    if (ruleProvider == null || resource == null) {
        return;
    }
    Collection<FlowRule> rules = ruleProvider.apply(resource.getName());
    if (rules != null) {
        for (FlowRule rule : rules) {
            if (!canPassCheck(rule, context, node, count, prioritized)) {
                throw new FlowException(rule.getLimitApp(), rule);
            }
        }
    }
}
上次编辑于:
贡献者: soulballad