Go 第三方库源码分析:uber-go/ratelimit
https://github.com/uber-go/ratelimit 是一个漏桶限流器的实现,
rl := ratelimit.New(100) // per second
prev := time.Now()
for i := 0; i < 10; i++ {
now := rl.Take()
fmt.Println(i, now.Sub(prev))
prev = now
}
在这个例子中,我们给定限流器每秒可以通过 100 个请求,也就是平均每个请求间隔 10ms。因此,最终会每 10ms 打印一行数据。输出结果如下:
// Output:
// 0 0
// 1 10ms
// 2 10ms
整个包中源码如下:
example_test.go
limiter_atomic.go
limiter_mutexbased.go
ratelimit.go
ratelimit_bench_test.go
ratelimit_test.go
1,ratelimit.New
先看下初始化的过程:
// New returns a Limiter that will limit to the given RPS.
func New(rate int, opts ...Option) Limiter {
return newAtomicBased(rate, opts...)
}
传入的参数是1s内产生的token数量:
// newAtomicBased returns a new atomic based limiter.
func newAtomicBased(rate int, opts ...Option) *atomicLimiter {
// TODO consider moving config building to the implementation
// independent code.
config := buildConfig(opts)
perRequest := config.per / time.Duration(rate)
l := &atomicLimiter{
perRequest: perRequest,
maxSlack: -1 * time.Duration(config.slack) * perRequest,
clock: config.clock,
}
initialState := state{
last: time.Time{},
sleepFor: 0,
}
atomic.StorePointer(&l.state, unsafe.Pointer(&initialState))
return l
}
1,通过options修改配置参数 config := buildConfig(opts)
func buildConfig(opts []Option) config {
c := config{
clock: clock.New(),
slack: 10,
per: time.Second,
}
for _, opt := range opts {
opt.apply(&c)
}
return c
}
可以看到,默认情况下per是1s
2,计算产生一个令牌话费的时间(时间间隔)
perRequest := config.per / time.Duration(rate)
3,初始化atomicLimiter,令牌产生时间间隔,时钟
type atomicLimiter struct {
state unsafe.Pointer
ignore U1000 Padding is unused but it is crucial to maintain performance :
of this rate limiter in case of collocation with other frequently accessed memory.
padding [56]byte // cache line size - state pointer size = 64 - 8; created to avoid false sharing.
perRequest time.Duration
maxSlack time.Duration
clock Clock
}
4,记录初始化状态:当前时间和休眠时间
type state struct {
last time.Time
sleepFor time.Duration
}
完成初始化的流程后,我们就进入了令牌产生的流程了。
2,rl.Take
Take是一个接口,返回当前时间
// Limiter is used to rate-limit some process, possibly across goroutines.
// The process is expected to call Take() before every iteration, which
// may block to throttle the goroutine.
type Limiter interface {
// Take should block to make sure that the RPS is met.
Take() time.Time
}
atomicLimiter 实现了这个接口
// Take blocks to ensure that the time spent between multiple
// Take calls is on average time.Second/rate.
func (t *atomicLimiter) Take() time.Time {
var (
newState state
taken bool
interval time.Duration
)
for !taken {
now := t.clock.Now()
previousStatePointer := atomic.LoadPointer(&t.state)
oldState := (*state)(previousStatePointer)
newState = state{
last: now,
sleepFor: oldState.sleepFor,
}
// If this is our first request, then we allow it.
if oldState.last.IsZero() {
taken = atomic.CompareAndSwapPointer(&t.state, previousStatePointer, unsafe.Pointer(&newState))
continue
}
// sleepFor calculates how much time we should sleep based on
// the perRequest budget and how long the last request took.
// Since the request may take longer than the budget, this number
// can get negative, and is summed across requests.
newState.sleepFor += t.perRequest - now.Sub(oldState.last)
// We shouldn't allow sleepFor to get too negative, since it would mean that
// a service that slowed down a lot for a short period of time would get
// a much higher RPS following that.
if newState.sleepFor < t.maxSlack {
newState.sleepFor = t.maxSlack
}
if newState.sleepFor > 0 {
newState.last = newState.last.Add(newState.sleepFor)
interval, newState.sleepFor = newState.sleepFor, 0
}
taken = atomic.CompareAndSwapPointer(&t.state, previousStatePointer, unsafe.Pointer(&newState))
}
t.clock.Sleep(interval)
return newState.last
}
1,获取当前时间
2,如果是初始化状态,也就是上一次访问时间是0,那么设置上一次访问时间是当前时间,直接返回。
3,计算睡眠的时间,睡眠时间=上一次记录的睡眠时间+每个令牌产生的时间间隔-(当前时间-上一次访问时间),也就是访问时间间隔
4,如果睡眠时间小于maxSlack,说明请求量很小,距离上一次访问时间已经很久了,将睡眠时间修改成maxSlack,否则无法应对大量突发流量。
5,如果睡眠时间大于0,说明请求量比较大,需要等待一段时间才能返回,调用 t.clock.Sleep(t.sleepFor),进入睡眠状态,同时修改上一次访问时间和休眠时间
6,如果小于等于0,说明请求量不大,可以立即返回,并记录当前时间
mutexLimiter 也实现了上述接口:
type mutexLimiter struct {
sync.Mutex
last time.Time
sleepFor time.Duration
perRequest time.Duration
maxSlack time.Duration
clock Clock
}
差别就是一个是基于互斥锁实现的,一个是基于原子操作实现的
func (t *mutexLimiter) Take() time.Time {
t.Lock()
defer t.Unlock()
now := t.clock.Now()
// If this is our first request, then we allow it.
if t.last.IsZero() {
t.last = now
return t.last
}
// sleepFor calculates how much time we should sleep based on
// the perRequest budget and how long the last request took.
// Since the request may take longer than the budget, this number
// can get negative, and is summed across requests.
t.sleepFor += t.perRequest - now.Sub(t.last)
// We shouldn't allow sleepFor to get too negative, since it would mean that
// a service that slowed down a lot for a short period of time would get
// a much higher RPS following that.
if t.sleepFor < t.maxSlack {
t.sleepFor = t.maxSlack
}
// If sleepFor is positive, then we should sleep now.
if t.sleepFor > 0 {
t.clock.Sleep(t.sleepFor)
t.last = now.Add(t.sleepFor)
t.sleepFor = 0
} else {
t.last = now
}
return t.last
}
Leaky Bucket,每个请求的间隔是固定的,然而,在实际上的互联网应用中,流量经常是突发性的。对于这种情况,uber-go 对 Leaky Bucket 做了一些改良,引入了最大松弛量 (maxSlack) 的概念。我们先理解下整体背景: 假如我们要求每秒限定 100 个请求,平均每个请求间隔 10ms。但是实际情况下,有些请求间隔比较长,有些请求间隔比较短。
(1)当 t.sleepFor > 0,代表此前的请求多余出来的时间,无法完全抵消此次的所需量,因此需要 sleep 相应时间, 同时将 t.sleepFor 置为 0。
(2)当 t.sleepFor < 0,说明此次请求间隔大于预期间隔,将多出来的时间累加到 t.sleepFor 即可。
但是,对于某种情况,请求 1 完成后,请求 2 过了很久到达 (好几个小时都有可能),那么此时对于请求 2 的请求间隔 now.Sub(t.last),会非常大。以至于即使后面大量请求瞬时到达,也无法抵消完这个时间。那这样就失去了限流的意义。
为了防止这种情况,ratelimit 就引入了最大松弛量 (maxSlack) 的概念, 该值为负值,表示允许抵消的最长时间,防止以上情况的出现。
推荐阅读