spring-boot漏桶限流实现实践
前言
今天最开始是打算通过线程池来实现漏桶限流算法的,但是实际分析之后发现似乎不具备可行性,难点有两个,一个是资源问题,如果每个接口方法都创建一个线程池的话,那是不敢想象的;另一个问题,如果全局采用一个线程池,那就无法实现精细化的接口限流,似乎也不够灵活,所以就放弃了,下面是我最初的思路:
定义一个线程池,漏桶通过线程池工作队列实现,漏桶出口速率通过线程的休眠来控制,丢弃超出容量的请求通过线程池的拒绝策略来实现。
最后,我直接找了一种网络上能够搜到的实现算法来完成今天实例demo
,下面让我们直接开始吧。
漏桶算法实现
首先我们先回顾下漏桶限流算法,它的具体原理是这样的:我们需要定义一个容量固定的漏桶,因为外部请求数量是不确定的,所以我们要通过漏桶的容量来控制请求数量。同时要确定漏桶释放请求的速率(出口),我们通过出口的速率,控制接口服务被调用的频速。当漏桶中的请求数达到上限时,所有申请加入漏桶的请求都会被丢弃掉。
详细研究漏桶算法,你会发现,关于请求丢弃的处理有两种方式,一种是直接丢弃多出来的请求,返回错误信息,另一种就是让当前请求进出阻塞状态,等到漏桶中释放出资源之后,再将请求放进漏桶中。今天我们先来看第一种,至于第二种,待我研究清楚了再说。
创建项目
和昨天一样,我们先创建一个spring boot
的web
项目,但是今天的项目就比较简单了,不需要引入任何外部包,只是为了方便测试,我引入了fastJson
的依赖:
<dependency>
<groupId>com.alibabagroupId>
<artifactId>fastjsonartifactId>
<version>1.2.72version>
dependency>
核心业务实现
我们先看下漏桶限流算法实现:
public final class LeakyBucket {
// 桶的容量
private int capacity = 10;
// 木桶剩余的水滴的量(初始化的时候的空的桶)
private AtomicInteger water = new AtomicInteger(0);
// 水滴的流出的速率 每1000毫秒流出1滴
private int leakRate;
// 第一次请求之后,木桶在这个时间点开始漏水
private long leakTimeStamp;
public LeakyBucket(int capacity, int leakRate) {
this.capacity = capacity;
this.leakRate = leakRate;
}
public LeakyBucket(int leakRate) {
this.leakRate = leakRate;
}
public boolean acquire() {
// 如果是空桶,就当前时间作为桶开是漏出的时间
if (water.get() == 0) {
leakTimeStamp = System.currentTimeMillis();
water.addAndGet(1);
return capacity != 0;
}
// 先执行漏水,计算剩余水量
int waterLeft = water.get() - ((int) ((System.currentTimeMillis() - leakTimeStamp) / 1000)) * leakRate;
water.set(Math.max(0, waterLeft));
// 重新更新leakTimeStamp
leakTimeStamp = System.currentTimeMillis();
// 尝试加水,并且水还未满
if ((water.get()) < capacity) {
water.addAndGet(1);
return true;
} else {
// 水满,拒绝加水
return false;
}
}
}
目前,网络上检索到的也基本上都是这种实现(也不知道谁抄的谁,我是不是也没脸说话,毕竟我也是代码搬运工)。
关于漏桶算法的实现,核心点是acquire()
方法,这个方法会判断漏桶是否已经满了,满了会直接返回false
,首次调用这个方法会返回true
,从第二次开始,会计算漏桶中的剩余水量,同时会更新水量,如果水量未达到水量上限,水量会+1
并返回true
。
但是这个算法的实现问题也很明显:leakRate
(出口速率)处理用于计算剩余水位外,并没有参与其他运算,这也就导致了漏桶的出口并不均匀。更合理的做法是,通过速率计算休眠时间,然后通过休眠时间控制速率的均匀性,今天由于时间的关系,我就先继续往下了,后面有时间了,优化完再来分享。
拦截器实现
今天的限速依然是通过拦截器来实现,实现过程也比较简单:
@Component
public class LeakyBucketLimiterInterceptor implements HandlerInterceptor {
@Override
public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {
if (handler instanceof HandlerMethod) {
HandlerMethod handlerMethod = (HandlerMethod) handler;
// 判断方法是否包含CounterLimit,有这个注解就需要进行限速操作
if (handlerMethod.hasMethodAnnotation(LeakyBucketLimit.class)) {
LeakyBucketLimit annotation = handlerMethod.getMethod().getAnnotation(LeakyBucketLimit.class);
LeakyBucket leakyBucket = (LeakyBucket)BeanTool.getBean(annotation.limitClass());
boolean acquire = leakyBucket.acquire();
response.setContentType("text/json;charset=utf-8");
JSONObject result = new JSONObject();
if (acquire) {
result.put("result", "请求成功");
} else {
result.put("result", "达到访问次数限制,禁止访问");
response.getWriter().print(JSON.toJSONString(result));
}
System.out.println(result);
return acquire;
}
}
return Boolean.TRUE;
}
}
首先我在配置类中构建漏桶算法的bean
,然后在拦截器中获取漏桶算法的实例,执行其acquire()
进行拦截操作,如果加入漏桶成功,则访问相关接口,否则直接返回错误信息。下面是漏桶算法的配置:
@Configuration
public class LeakyBucketConfig {
@Bean("leakyBucket")
public LeakyBucket leakyBucket() {
return new LeakyBucket(10, 5);
}
}
然后再是拦截器注解:
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
public @interface LeakyBucketLimit {
/**
* 限流器名称
* @return
*/
String limitBeanName();
/**
* 拦截器class
*
* @return
*/
Class> limitClass() default LeakyBucket.class;
}
将该注解加到我们的目标接口上即可实现限流操作:
@LeakyBucketLimit(limitBeanName = "leakyBucket")
@GetMapping("/bucket")
public Object bucketTest() {
JSONObject result = new JSONObject();
result.put("result", "请求成功");
logger.info("timestamp: {}, result: {}", System.currentTimeMillis(), result);
return result;
}
测试
这里测试直接通过postman
批量调用即可(具体可以自行百度):
这里我创建了20
个线程,然后直接调用接口:
从调用结果可以看出来,我们同时发起了20
个请求,但是系统只接受了10
个请求(也就是漏桶的上限),其余的请求直接被抛弃掉,说明限流效果已经达到,但是从系统运行的时间戳来看,这种限流算法的实现出口并不均匀,效果上甚至和我们昨天分享的计数器限流差不多,当然这也是我想吐槽的,所以说各位小伙伴在抄网上代码的时候,一定要亲自实践下,不能盲目抄作业。
结语
总结的话我在前面已经说了:我对这个算法并不满意。因为它的出口速率并不均匀,还需要进一步优化,因此今天的demo
示例只能算成功了一半——漏桶算法的web
实现思路分享完了,主要是业务层和限流解耦的思路,但是关于漏桶算法的核心实现并没解决,后面我打算参考guava
的RateLimiter
的休眠操作,优化上面的算法,所以今天就先到这里吧,各位小伙伴,晚安哟!