博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
【springcloud】2.eureka源码分析之令牌桶-限流算法
阅读量:5009 次
发布时间:2019-06-12

本文共 4492 字,大约阅读时间需要 14 分钟。

国际惯例原理图

 

 

代码实现

package Thread;import java.util.concurrent.TimeUnit;import java.util.concurrent.atomic.AtomicInteger;import java.util.concurrent.atomic.AtomicLong;/** * @ProjectName: cutter-point * @Package: Thread * @ClassName: RateLimiter * @Author: xiaof * @Description: 令牌桶,限流 * @Date: 2019/6/21 11:41 * @Version: 1.0 */public class RateLimiter {    //限流消费的令牌    private final AtomicInteger consumedTokens = new AtomicInteger();    private final AtomicLong lastRefrushTokenTime = new AtomicLong(0);    //限流类型,是秒,还是分    private final long rateType;    public RateLimiter(TimeUnit averageRateUnit) {        switch (averageRateUnit) {            case SECONDS:                rateType = 1000;                break;            case MINUTES:                rateType = 60 * 1000;                break;            default:                throw new IllegalArgumentException("TimeUnit of " + averageRateUnit + " is not supported");        }    }    //请求令牌,判断是否可以获取到新的令牌    public boolean acquire(int bucketSize, long avgRate) {        return acquire(bucketSize, avgRate, System.currentTimeMillis());    }    public boolean acquire(int bucketSize, long avgRate, long curentTimeMillis) {        if(bucketSize <= 0 || avgRate <= 0) {            return true;//如果这2个参数,任意一个为0 ,我们就认为没有上限        }        //刷新令牌桶        refillToken(bucketSize, avgRate, curentTimeMillis);        //开始消费令牌        return consumToken(bucketSize);    }    private void refillToken(int bucketSize, long avgRate, long currentTimeMillis) {        //获取上次最后以后更新令牌时间        long freshTime = lastRefrushTokenTime.get();        //获取当前间隔时间        long timeDelta = currentTimeMillis - freshTime;        //计算这次需要填充的token数        long newToken = timeDelta * avgRate /rateType;        if(newToken > 0) {            //新的更新时间            long newFillTime = freshTime == 0 ? currentTimeMillis : freshTime + timeDelta;            //用cas操作,以保证只有一个线程注入新令牌            if(lastRefrushTokenTime.compareAndSet(freshTime, newFillTime)) {                //死循环,直到设置成功新的令牌                while(true) {                    //1.获取当前消费的令牌                    int currentConsumToken = consumedTokens.get();                    //2.获取消费的令牌容量,跟桶极限大小比较,取小的那个                    int realConsumTokens = Math.min(currentConsumToken, bucketSize);                    //3.计算填充之后剩余的被消费的容量,计算新增容量,用来填充被消费的令牌数                    //剩余的消费容量,但是不能比0还小,这个要取值                    int newConsumSize = (int) Math.max(0, realConsumTokens - newToken);                    //然后设置进去                    if(consumedTokens.compareAndSet(currentConsumToken, newConsumSize)) {                        return;                    }                }            }        }    }    //消费令牌    private boolean consumToken(int bucketSize) {        while (true) {            int currentLevel = consumedTokens.get();            //如果超出负载            if (currentLevel >= bucketSize) {                return false;            }            //每次消费一个            if (consumedTokens.compareAndSet(currentLevel, currentLevel + 1)) {                return true;            }        }    }    public void reset() {        consumedTokens.set(0);        lastRefrushTokenTime.set(0);    }}

 

 

到这里可能有的人不清楚怎么用,来我们测试一波

我们假设有100个用户同时请求,然后令牌恢复速率调成10,然后速率单位改为秒,也就是1秒恢复10个令牌

这样同时100个请求过来,马上令牌就会被用完,那么就会被限流,比如我们拦截器这个时候可以返回404,或者503

public static void main(String args[]) {        RateLimiter rateLimiter = new RateLimiter(TimeUnit.SECONDS);        final int bucketSize = 10;        //回复令牌生产速率        final long avgRate = 10;        //判断是否流量达到上限        ExecutorService pool = Executors.newCachedThreadPool();        for(int i = 0; i < 100; ++i) {            pool.submit(new Runnable() {                @Override                public void run() {                    while(true) {                        try {                            Thread.sleep(((int) (Math.random() * 10)) * 1000) ;                            if(!rateLimiter.acquire(bucketSize, avgRate)) {                                System.err.println(Thread.currentThread().getName() + "已经限流成功----response.setStatus(404)");                            } else {                                System.out.println(Thread.currentThread().getName() + "正常执行");                            }                        } catch (InterruptedException e) {                            e.printStackTrace();                        }                    }                }            });        }        while(true) {        }    }

 

效果展示

 

 

转载于:https://www.cnblogs.com/cutter-point/p/11064362.html

你可能感兴趣的文章
1.7 将一个MxN矩阵所有为0的元素所在行和列全部置0
查看>>
删除U盘时提示无法停止‘通用卷’设备的解决方法!!不要每次都硬拔了,对电脑有不小的损害!!!...
查看>>
Java中接口与接口和类之间的关系
查看>>
芯片TPS70925
查看>>
.net wordpress 服务器类
查看>>
MVC的一个简单实例
查看>>
python中的字符串格式化
查看>>
JVM平台上的响应式流(Reactive Streams)规范
查看>>
南阳理工57---6174问题
查看>>
Spinner实现列表下拉功能
查看>>
HTML5全栈工程师学什么?
查看>>
【Dart】生成固定长度随机数
查看>>
.NET 使用 RabbitMQ 图文简介
查看>>
php中的namespace 命名空间
查看>>
python数据写入Excel表格
查看>>
linux shell 发送email 附件
查看>>
人群密度估计 CrowdCount
查看>>
京东为什么不会死
查看>>
JSON.parse()和JSON.stringify()
查看>>
.net 常用正则表达式
查看>>