自己动手写令牌桶、漏桶、计数等限流实现

时间:2022-06-04
本文章向大家介绍自己动手写令牌桶、漏桶、计数等限流实现,主要内容包括其使用实例、应用技巧、基本知识点总结和需要注意事项,具有一定的参考价值,需要的朋友可以参考一下。

限流最核心的就是“在指定的时间内指定的访问者能访问多少次”。

顺着这个思路?现在举个例子“访问者A在2秒内访问次数不能超过5次”。

那么你就需要一个数据结构来存储"访问者A",同时记录“2秒”的过期时间,同时要记录在这2秒内的访问次数。

计数+过期时间

接下来我们就手动编写一个单机版的限流器。

/**
 * 简单的速率限制器
 * @author hezhuofan
 */
@Data
@NoArgsConstructor
@AllArgsConstructor
@Builder
public class Bucket {
    /**
     * 指定的请求
     */
    private volatile String key;
    /**
     * 开始时间
     */
    private volatile Long start;
    /**
     * 定时时长
     */
    private volatile Long interval;
    /**
     * 当前次数
     */
    private volatile  AtomicInteger count;
    /**
     * 请求次数
     */
    private volatile Integer limit;

    public static void main(String[] args) {
        Bucket bucket = Bucket.builder().key("request a").limit(5)
                .interval(2000L).build();
        bucket.request();
        bucket.request();
        bucket.request();
        bucket.request();
        bucket.request();
        bucket.request();
        bucket.request();
        bucket.request();
        bucket.request();
        bucket.request();
        bucket.request();
        bucket.request();
        bucket.request();
        bucket.request();
        bucket.request();
    }

    public boolean request() {
        if(start==null){
            start=new Date().getTime();
        }
        if(System.currentTimeMillis()-start<=interval){
            if(count==null){
                count=new AtomicInteger(0);
            }
            if(count.intValue()<=limit) {
                count.incrementAndGet();
                return true;
            }else{
                System.out.println(key+"被拒绝访问");
                return false;
            }
        }else{
            start=new Date().getTime();
            return request();
        }
    }
}

就像上面分析的那样我们使用key来记录“指定的访问者”,用start记录开始时间,用interval记录指定时间段,用limit记录允许访问的次数,使用count记录实际访问次数。

然后里边有一个request方法,这个方法会返回一个请求是否有资格被放行。

然而上面的只支持一个请求。为了支持多个请求。我们需要有一个map或者数组来维护多个请求各自的状态。

接下来我们就新建一个RateLimiter类来包装这个Bucket,使得它支持多个请求。

/**
 * 支持多个请求
 * @author hezhuofan
 */
public class RateLimiter {
    private ConcurrentHashMap<String,Bucket> buckets=new ConcurrentHashMap<>(300);//支持多个请求

    public boolean getKey(String key){
         Bucket bucket=buckets.get(key);
         if(bucket==null){
             Bucket bucket1 = Bucket.builder().key(key).limit(12)
                     .interval(2000L).build();
             bucket=bucket1;
             buckets.put(key,bucket1);
         }
         return bucket.request();
    }

    public static void main(String[] args) {
        RateLimiter rateLimiter= new RateLimiter();
        rateLimiter.getKey("request a");
        rateLimiter.getKey("request a");
        rateLimiter.getKey("request a");
        rateLimiter.getKey("request a");
        rateLimiter.getKey("request a");
        rateLimiter.getKey("request a");
        rateLimiter.getKey("request a");
        rateLimiter.getKey("request a");
        rateLimiter.getKey("request a");
        rateLimiter.getKey("request a");
        rateLimiter.getKey("request a");
        rateLimiter.getKey("request a");
        rateLimiter.getKey("request a");
        rateLimiter.getKey("request a");
    }
}

这样我们就支持了多个请求。每个请求都被存放在了map中,各自的key就是请求id本身。

好,这只是一个简单的通过计数和加过期时间的单机版的限流器。

令牌桶

事实上,限流还有令牌桶的方式。令牌桶的方式同样也是类似计数的方式。只是变为了有个地方可以发牌照,然后请求来了都去领牌照,如果拿到了则可以继续,否则就被拒绝。我们不妨来简单的实现一下:

/**
 * 令牌桶
 *
 * 核心算法,每个请求去领取token,拿到token然后继续
 * @author hezhuofan
 */
@AllArgsConstructor
@NoArgsConstructor
@Builder
public class TokenBucket {
    /**
     * 唯一标识
     */
    private String key;
    /**
     * 桶的大小
     */
    private Integer limit;

    /**
     * 桶当前的token
     */
    private volatile  AtomicInteger tokens;

    /**
     * 加token
     */
    public void addToken(){
        if(tokens==null){
            tokens=new AtomicInteger(0);
        }
        tokens.incrementAndGet();
    }

    /**
     * 减token
     */
    public void delToken(){
        tokens.decrementAndGet();
    }


    public synchronized  boolean getToken(){
        if(tokens==null){
            tokens=new AtomicInteger(0);
        }
        if(tokens.intValue()>0){
            return  tokens.decrementAndGet()>0;
        }
        return false;
    }

    public static void main(String[] args) {

    }

}

上面我们写了一个令牌桶。用limit来表示桶的大小,用tokens来表示桶当前的容量,用key表示当前的请求ID(或者指代某个服务,比如服务ID)。

然后我们提供了对桶的基本操作。向桶中添加token,删除桶中的token,得到桶中的某个token。为了简单清晰明了,这里我们使用一个AtomicInteger的数量来表示“token们”。特别注意的是:getToken方法是通过简单的decrementAndGet方法减去1,然后判断返回结果大于0,则认为是拿到了token。

好,现在我们构建了一个基本的令牌桶。现在是时候往桶中添加token了。那么我们需要一个人按照指定速率向桶中添加token。

现在我们就去构造一个任务类。

/**
 * @author hezhuofan
 */
public class TokenProducer implements Runnable {
    protected TokenBucket tokenBucket;

    public TokenProducer(TokenBucket tokenBucket){
        this.tokenBucket=tokenBucket;
    }
    @Override
    public void run() {
         tokenBucket.addToken();
    }
}

这个任务类只干一件事情:向指定的桶中添加令牌(token)。

然后我们就来构造一个管理器,这个管理器负责按照指定速率添加token。

/**
 * token 生产者管理器
 * @author hezhuofan
 */
public class TokenProducerManager
{
    /**
     * 按照指定速率添加token
     * @param key 指定请求ID
     * @param period 速率
     * @param limit 桶的大小
     */
    private static TokenBucket execute(String key,Long period,Integer limit) {
        ScheduledExecutorService scheduledExecutorService= Executors.newScheduledThreadPool(1);
        TokenBucket tokenBucket = TokenBucket.builder().key(key).limit(limit).build();
        scheduledExecutorService.scheduleAtFixedRate(new TokenProducer(tokenBucket),0l,period, TimeUnit.SECONDS);
        return tokenBucket;
    }

    public static TokenBucket addTokenAtFixRate(String key,Long period,Integer limit){
        return execute(key,period,limit);
    }
}

这里我们使用并发包里的ScheduledExecutorService来实现定时添加token。我们把刚才新建的任务类TokenProducer的实例传入,然后调用scheduleAtFixedRate来启动定时添加token的任务,然后把已实例化并且正在被定时添加token的令牌桶TokenBucket实例返回,方便后续使用。我们暴露的方法addTokenAtFixRate支持指定请求key,指定速率,指定桶的大小。

然后我们执行如下测试代码:

public static void main(String[] args) throws InterruptedException {
    TokenBucket tokenBucket=TokenProducerManager.addTokenAtFixRate("request a",1l,2000);

    Thread.sleep(6000L);
    System.out.println(tokenBucket.getToken());
    System.out.println(tokenBucket.getToken());
    System.out.println(tokenBucket.getToken());
    System.out.println(tokenBucket.getToken());
    System.out.println(tokenBucket.getToken());
    System.out.println(tokenBucket.getToken());
    System.out.println(tokenBucket.getToken());
    System.out.println(tokenBucket.getToken());
    System.out.println(tokenBucket.getToken());
    System.out.println(tokenBucket.getToken());
    System.out.println(tokenBucket.getToken());
}

输出:

true

true

true

true

true

true

false

false

false

false

false

发现了没?其实本质上还是计数。当和前面的那种计数不一样的地方是,令牌桶支持动态的添加token,也就是动态改变上限。你可以控制添加令牌的速率。

漏桶

好,现在我们再来写个漏桶(leak bucket)算法的限流。漏桶算法和令牌桶有点像。但漏桶是直接把请求放进桶里,桶满了,其他放不进去的请求直接拒绝,。

漏桶核心是:请求来了以后,直接进桶,然后桶根据自己的漏洞大小慢慢往外面漏。

接下来我们就尝试来实现一下漏桶算法。实现漏桶最简单的方式就是使用一个FIFO的队列,一端负责不断的放入请求,另外一端负责吐出请求。

我们首先实现一个LeakBucket。

/**
 * 漏桶
 * @author hezhuofan
 */
public class LeakBucket<T> {
    private volatile String key;
    private volatile Integer limit=3000;
    private volatile  Queue<T> queue = new ArrayDeque<T>(this.limit);

    public boolean flow(T request){
        return queue.add(request);
    }

    public T leak(){
        return queue.poll();
    }

    public void setLimit(Integer limit){
        this.limit=limit;
    }

    public void setKey(String key){
        this.key=key;
    }

}

以上就是我们实现的简易漏桶。其中key依然是表示请求ID或服务ID,用来唯一标识。然后我们规定了漏桶的大小为3000,也就是说如果瞬间的高并发请求大量涌入的话,超出桶的大小就会直接被拒绝掉。然后是我们使用ArrayDeque这样一个FIFO队列来模拟漏桶。

接下来实现了两个核心方法一个是flow方法,一个是leak方法,顾名思义一个是负责流入,一个是负责漏出。其中flow方法使用队列的add方法实现,leak方法使用队列的poll方法实现。

现在漏桶已经有了。接下来就是需要有人负责往漏桶里加水(请求),有人按照指定的频率(对应于“漏洞的大小”)把请求从漏桶下面漏出。

为此,我们需要创建两个定时任务,一个任务负责加水,一个负责按照指定频率向外漏水。

public class FlowTask implements Runnable{
    protected LeakBucket<String> leakBucket;
    public FlowTask(LeakBucket<String> leakBucket){
        this.leakBucket=leakBucket;
    }

    @Override
    public void run() {
        leakBucket.flow("request a");
    }
}
public class LeakTask implements Runnable{
    protected LeakBucket leakBucket;
    public LeakTask(LeakBucket leakBucket){
        this.leakBucket=leakBucket;
    }

    @Override
    public void run() {
        leakBucket.leak();
    }
}

接下来我们像之前的算法那样编写一个manager类来启动两个定时任务,然后模拟漏桶效果。

public class LeakManager {

    private static LeakBucket<String> execute(String key, Long period, Integer limit) {
        ScheduledExecutorService scheduledExecutorService= Executors.newScheduledThreadPool(1);
        LeakBucket<String> leakBucket=new LeakBucket<>();
        leakBucket.setLimit(limit);
        leakBucket.setKey(key);

        ScheduledExecutorService flowScheduledExecutorService= Executors.newScheduledThreadPool(1);
        flowScheduledExecutorService.scheduleAtFixedRate(new FlowTask(leakBucket),0l,period, TimeUnit.SECONDS);

        try {
            Thread.sleep(3000L);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        scheduledExecutorService.scheduleAtFixedRate(new LeakTask(leakBucket),0l,period, TimeUnit.SECONDS);
        return leakBucket;
    }

    /**
     * 按照指定速率添加token
     * @param key 指定请求ID
     * @param period 速率
     * @param limit 桶的大小
     */
    public static LeakBucket<String> leakRequestAtFixRate(String key,Long period,Integer limit){
        return execute(key,period,limit);
    }

    public static void main(String[] args) throws InterruptedException {
        LeakBucket<String> leakBucket= LeakManager.leakRequestAtFixRate("request a",1l,2000);

        Thread.sleep(6000L);
        System.out.println(leakBucket.leak());
        System.out.println(leakBucket.leak());
        System.out.println(leakBucket.leak());
        System.out.println(leakBucket.leak());
        System.out.println(leakBucket.leak());
        System.out.println(leakBucket.leak());
        System.out.println(leakBucket.leak());
        System.out.println(leakBucket.leak());
        System.out.println(leakBucket.leak());
        System.out.println(leakBucket.leak());
        System.out.println(leakBucket.leak());
        System.out.println(leakBucket.leak());
        System.out.println(leakBucket.leak());
        System.out.println(leakBucket.leak());
        System.out.println(leakBucket.leak());
        System.out.println(leakBucket.leak());
        System.out.println(leakBucket.leak());
        System.out.println(leakBucket.leak());
        System.out.println(leakBucket.leak());
        System.out.println(leakBucket.leak());
        System.out.println(leakBucket.leak());
        System.out.println(leakBucket.leak());
        System.out.println(leakBucket.leak());
        System.out.println(leakBucket.leak());
    }
}

好,以上就是三种限流的简单实现。

分布环境下的限流

好,上面的都只是单机版的简易实现,旨在让你明白各种限流算法的核心脉络。在分布式环境下,你就需要在一个集中的地方来维护计数和队列等等这些桶了。这时候就需要用到诸如redis或zookeeper来对上面对应的变量和队列进行修改了。

现成的解决方案

以下是摘抄的一些很不错的分布式限流的现成解决思路:

1、redis incr加过期时间来限流。

int current = jedis.incr(key);

if (current + 1 > limit) //如果超出限流大小

return 0;

else if (current == 1) //只有第一次访问需要设置2秒的过期时间

jedis.expire(key, "2");

return 1

2、另外还有通过redis+lua来实现限流。

3、hystrix的线程池就类似漏桶的思路。

4、guava包中有现成的基于令牌桶的限流实现。

总结

计数+过期时间的方式就是一种粗暴的限流方式,也是常见的限流方式。但无法对流量整形。比如你规定了一分钟内请求A最多能访问60次。但如果前10秒就来了60次请求,而之后的50秒则没有请求。令牌桶的方式是请求从桶中领取token,拿到后才可继续。而漏桶则是将请求放入桶中,然后漏洞按照指定的频率漏出请求,这样就可以对突发流量进行整形,让请求总是按照漏洞的大小平稳的漏出。如果在分布式下实现限流,需要把你的计数器和漏桶队列维护到一个公共的地方,比如redis,zookeeper,数据库等。hystrix的线程池就类似漏桶的思路,guava里有现成的基于令牌桶的限流实现。