Go 第三方库源码分析:uber-go/ratelimit

共 6190字,需浏览 13分钟

 ·

2021-10-02 04:00

https://github.com/uber-go/ratelimit 是一个漏桶限流器的实现,

rl := ratelimit.New(100// per second
prev := time.Now()for i := 0; i < 10; i++ { now := rl.Take() fmt.Println(i, now.Sub(prev)) prev = now}

在这个例子中,我们给定限流器每秒可以通过 100 个请求,也就是平均每个请求间隔 10ms。因此,最终会每 10ms 打印一行数据。输出结果如下:

// Output:// 0 0// 1 10ms// 2 10ms

整个包中源码如下:

example_test.golimiter_atomic.golimiter_mutexbased.goratelimit.goratelimit_bench_test.goratelimit_test.go


1,ratelimit.New

先看下初始化的过程:

// New returns a Limiter that will limit to the given RPS.func New(rate int, opts ...Option) Limiter {  return newAtomicBased(rate, opts...)}

传入的参数是1s内产生的token数量:

// newAtomicBased returns a new atomic based limiter.func newAtomicBased(rate int, opts ...Option) *atomicLimiter {  // TODO consider moving config building to the implementation  // independent code.  config := buildConfig(opts)  perRequest := config.per / time.Duration(rate)  l := &atomicLimiter{    perRequest: perRequest,    maxSlack:   -1 * time.Duration(config.slack) * perRequest,    clock:      config.clock,  }
initialState := state{ last: time.Time{}, sleepFor: 0, } atomic.StorePointer(&l.state, unsafe.Pointer(&initialState)) return l}

1,通过options修改配置参数 config := buildConfig(opts)

func buildConfig(opts []Option) config {  c := config{    clock: clock.New(),    slack: 10,    per:   time.Second,  }
for _, opt := range opts { opt.apply(&c) } return c}

可以看到,默认情况下per是1s

2,计算产生一个令牌话费的时间(时间间隔

perRequest := config.per / time.Duration(rate)

3,初始化atomicLimiter,令牌产生时间间隔,时钟

type atomicLimiter struct {  state unsafe.Pointer  //lint:ignore U1000 Padding is unused but it is crucial to maintain performance  // of this rate limiter in case of collocation with other frequently accessed memory.  padding [56]byte // cache line size - state pointer size = 64 - 8; created to avoid false sharing.
perRequest time.Duration maxSlack time.Duration clock Clock}

4,记录初始化状态:当前时间和休眠时间

type state struct {  last     time.Time  sleepFor time.Duration}

完成初始化的流程后,我们就进入了令牌产生的流程了。


2,rl.Take

Take是一个接口,返回当前时间

// Limiter is used to rate-limit some process, possibly across goroutines.// The process is expected to call Take() before every iteration, which// may block to throttle the goroutine.type Limiter interface {  // Take should block to make sure that the RPS is met.  Take() time.Time}

atomicLimiter 实现了这个接口

// Take blocks to ensure that the time spent between multiple// Take calls is on average time.Second/rate.func (t *atomicLimiter) Take() time.Time {  var (    newState state    taken    bool    interval time.Duration  )  for !taken {    now := t.clock.Now()
previousStatePointer := atomic.LoadPointer(&t.state) oldState := (*state)(previousStatePointer)
newState = state{ last: now, sleepFor: oldState.sleepFor, }
// If this is our first request, then we allow it. if oldState.last.IsZero() { taken = atomic.CompareAndSwapPointer(&t.state, previousStatePointer, unsafe.Pointer(&newState)) continue }
// sleepFor calculates how much time we should sleep based on // the perRequest budget and how long the last request took. // Since the request may take longer than the budget, this number // can get negative, and is summed across requests. newState.sleepFor += t.perRequest - now.Sub(oldState.last) // We shouldn't allow sleepFor to get too negative, since it would mean that // a service that slowed down a lot for a short period of time would get // a much higher RPS following that. if newState.sleepFor < t.maxSlack { newState.sleepFor = t.maxSlack } if newState.sleepFor > 0 { newState.last = newState.last.Add(newState.sleepFor) interval, newState.sleepFor = newState.sleepFor, 0 } taken = atomic.CompareAndSwapPointer(&t.state, previousStatePointer, unsafe.Pointer(&newState)) } t.clock.Sleep(interval) return newState.last}

1,获取当前时间

2,如果是初始化状态,也就是上一次访问时间是0,那么设置上一次访问时间是当前时间,直接返回。

3,计算睡眠的时间,睡眠时间=上一次记录的睡眠时间+每个令牌产生的时间间隔-(当前时间-上一次访问时间),也就是访问时间间隔

4,如果睡眠时间小于maxSlack,说明请求量很小,距离上一次访问时间已经很久了,将睡眠时间修改成maxSlack否则无法应对大量突发流量

5,如果睡眠时间大于0,说明请求量比较大,需要等待一段时间才能返回,调用 t.clock.Sleep(t.sleepFor),进入睡眠状态,同时修改上一次访问时间和休眠时间

6,如果小于等于0,说明请求量不大,可以立即返回,并记录当前时间

mutexLimiter 也实现了上述接口:

type mutexLimiter struct {  sync.Mutex  last       time.Time  sleepFor   time.Duration  perRequest time.Duration  maxSlack   time.Duration  clock      Clock}

差别就是一个是基于互斥锁实现的,一个是基于原子操作实现的

func (t *mutexLimiter) Take() time.Time {  t.Lock()  defer t.Unlock()
now := t.clock.Now()
// If this is our first request, then we allow it. if t.last.IsZero() { t.last = now return t.last }
// sleepFor calculates how much time we should sleep based on // the perRequest budget and how long the last request took. // Since the request may take longer than the budget, this number // can get negative, and is summed across requests. t.sleepFor += t.perRequest - now.Sub(t.last)
// We shouldn't allow sleepFor to get too negative, since it would mean that // a service that slowed down a lot for a short period of time would get // a much higher RPS following that. if t.sleepFor < t.maxSlack { t.sleepFor = t.maxSlack }
// If sleepFor is positive, then we should sleep now. if t.sleepFor > 0 { t.clock.Sleep(t.sleepFor) t.last = now.Add(t.sleepFor) t.sleepFor = 0 } else { t.last = now }
return t.last}

 Leaky Bucket,每个请求的间隔是固定的,然而,在实际上的互联网应用中,流量经常是突发性的。对于这种情况,uber-go 对 Leaky Bucket 做了一些改良,引入了最大松弛量 (maxSlack) 的概念。我们先理解下整体背景: 假如我们要求每秒限定 100 个请求,平均每个请求间隔 10ms。但是实际情况下,有些请求间隔比较长,有些请求间隔比较短。

(1)当 t.sleepFor > 0,代表此前的请求多余出来的时间,无法完全抵消此次的所需量,因此需要 sleep 相应时间, 同时将 t.sleepFor 置为 0。

(2)当 t.sleepFor < 0,说明此次请求间隔大于预期间隔,将多出来的时间累加到 t.sleepFor 即可。

但是,对于某种情况,请求 1 完成后,请求 2 过了很久到达 (好几个小时都有可能),那么此时对于请求 2 的请求间隔 now.Sub(t.last),会非常大。以至于即使后面大量请求瞬时到达,也无法抵消完这个时间。那这样就失去了限流的意义。

了防止这种情况,ratelimit 就引入了最大松弛量 (maxSlack) 的概念, 该值为负值,表示允许抵消的最长时间,防止以上情况的出现。



推荐阅读


福利

我为大家整理了一份从入门到进阶的Go学习资料礼包,包含学习建议:入门看什么,进阶看什么。关注公众号 「polarisxu」,回复 ebook 获取;还可以回复「进群」,和数万 Gopher 交流学习。


浏览 92
点赞
评论
收藏
分享

手机扫一扫分享

分享
举报
评论
图片
表情
推荐
点赞
评论
收藏
分享

手机扫一扫分享

分享
举报