Kubernetes 源码学习之限速队列

时间:2022-07-26
本文章向大家介绍Kubernetes 源码学习之限速队列,主要内容包括其使用实例、应用技巧、基本知识点总结和需要注意事项,具有一定的参考价值,需要的朋友可以参考一下。

前面我们详细分析了 client-go 中的延迟队列的实现,接下来就是限速队列的实现,限速队列在我们日常应用中非常广泛,其原理也比较简单,利用延迟队列的特性,延迟某个元素的插入时间来达到限速的目的。

所以限速队列是扩展的延迟队列,在其基础上增加了 AddRateLimitedForgetNumRequeues 3个方法。限速队列接口的定义如下所示:

// k8s.io/client-go/util/workqueue/rate_limiting_queue.go

// RateLimitingInterface 是对加入队列的元素进行速率限制的接口
type RateLimitingInterface interface {
  // 延时队列
 DelayingInterface 

 // 在限速器说ok后,将元素item添加到工作队列中
 AddRateLimited(item interface{})

 // 丢弃指定的元素
 Forget(item interface{})

  // 查询元素放入队列的次数
 NumRequeues(item interface{}) int
}

很明显我们可以看出来限速队列是在延时队列基础上进行的扩展,接下来我们查看下限速队列的实现结构:

// k8s.io/client-go/util/workqueue/rate_limiting_queue.go

// 限速队列的实现
type rateLimitingType struct {
  // 同样集成了延迟队列
 DelayingInterface
  // 因为是限速队列,所以在里面定义一个限速器
 rateLimiter RateLimiter
}

// 通过限速器获取延迟时间,然后加入到延时队列
func (q *rateLimitingType) AddRateLimited(item interface{}) {
 q.DelayingInterface.AddAfter(item, q.rateLimiter.When(item))
}

// 直接通过限速器获取元素放入队列的次数
func (q *rateLimitingType) NumRequeues(item interface{}) int {
 return q.rateLimiter.NumRequeues(item)
}

// 直接通过限速器丢弃指定的元素
func (q *rateLimitingType) Forget(item interface{}) {
 q.rateLimiter.Forget(item)
}

我们可以看到限速队列的实现非常简单,就是通过包含的限速器去实现各种限速的功能,所以我们要真正去了解的是限速器的实现原理。

限速器

限速器当然也是一种接口抽象,我们可以实现各种各样的限速器,甚至不限速也可以,该接口定义如下所示:

// k8s.io/client-go/util/workqueue/default_rate_limiter.go

// 限速器接口
type RateLimiter interface {
 // 获取指定的元素需要等待的时间
 When(item interface{}) time.Duration
 // 释放指定元素,表示该元素已经被处理了
 Forget(item interface{})
 // 返回某个对象被重新入队多少次,监控用
 NumRequeues(item interface{}) int
}

可以看到和上面的限速队列的扩展接口方法非常类似,索引在实现的时候我们都是直接调用限速器对应的实现方法。接下来我们来看看几种限速器的具体实现。

BucketRateLimiter

第一个要了解的限速器使用非常频繁 - BucketRateLimiter(令牌桶限速器),这是一个固定速率(qps)的限速器,该限速器是利用 golang.org/x/time/rate 库来实现的,令牌桶算法内部实现了一个存放 token(令牌)的“桶”,初始时“桶”是空的,token 会以固定速率往“桶”里填充,直到将其填满为止,多余的 token 会被丢弃。每个元素都会从令牌桶得到一个 token,只有得到 token 的元素才允许通过,而没有得到 token 的元素处于等待状态。令牌桶算法通过控制发放 token 来达到限速目的。

比如抽奖、抢优惠、投票、报名……等场景,在面对突然到来的上百倍流量峰值,除了消息队列,预留容量以外,我们可以考虑做峰值限流。因为对于大部分营销类活动,消息限流(对被限流的消息直接丢弃,并直接回复:“系统繁忙,请稍后再试。”)并不会对营销的结果有太大影响。

令牌桶是有一个固定大小的桶,系统会以恒定的速率向桶中放 Token,桶满了就暂时不放了,而用户则从桶中取 Token,如果有剩余的 Token 就可以一直取,如果没有剩余的 Token,则需要等到系统中放置了 Token 才行。

golang 中就自带了一个令牌桶限速器的实现,我们可以使用以下方法构造一个限速器对象:

limiter := NewLimiter(10, 1);

该构造函数包含两个参数:

  1. 第一个参数是 r Limit。代表每秒可以向 Token 桶中产生多少 token,Limit 实际上是 float64 的别名。
  2. 第二个参数是 b int。b 代表 Token 桶的容量大小。

上面我们构造出的限速器含义就是,其令牌桶大小为 1,以每秒 10 个 Token 的速率向桶中放置 Token。

除了直接指定每秒产生的 Token 个数外,还可以用 Every 方法来指定向 Token 桶中放置 Token 的间隔,例如:

limit := Every(100 * time.Millisecond)
limiter := NewLimiter(limit, 1)

以上就表示每 100ms 往桶中放一个 Token,本质上也就是一秒钟产生 10 个。

Limiter 提供了三类方法供用户消费 Token,用户可以每次消费一个 Token,也可以一次性消费多个 Token。而每种方法代表了当 Token 不足时,各自不同的对应手段。

Wait/WaitN

func (lim *Limiter) Wait(ctx context.Context) (err error)
func (lim *Limiter) WaitN(ctx context.Context, n int) (err error)

Wait 实际上就是 WaitN(ctx,1)。当使用 Wait 方法消费 Token 时,如果此时桶内 Token 不足时 (小于 N),那么 Wait 方法将会阻塞一段时间,直至 Token 满足条件,当然如果充足则直接返回。我们可以看到,Wait 方法有一个 context 参数,我们可以设置 context 的 Deadline 或者 Timeout,来决定此次 Wait 的最长时间。

Allow/AllowN

func (lim *Limiter) Allow() bool
func (lim *Limiter) AllowN(now time.Time, n int) bool

Allow 实际上就是 AllowN(time.Now(),1)。AllowN 方法表示,截止到某一时刻,目前桶中数目是否至少为 n 个,满足则返回 true,同时从桶中消费 n 个 token。反之返回不消费 Token,false。通常对应这样的线上场景,如果请求速率过快,就直接丢到某些请求。

Reserve/ReserveN

func (lim *Limiter) Reserve() *Reservation
func (lim *Limiter) ReserveN(now time.Time, n int) *Reservation

Reserve 相当于 ReserveN(time.Now(), 1)。ReserveN 的用法就相对来说复杂一些,当调用完成后,无论 Token 是否充足,都会返回一个 Reservation 对象指针。

你可以调用该对象的 Delay() 方法,该方法返回了需要等待的时间,如果等待时间为 0,则说明不用等待。必须等到等待时间之后,才能进行接下来的工作。或者,如果不想等待,可以调用 Cancel() 方法,该方法会将 Token 归还。

动态调整速率

此外 Limiter 还支持调整速率和桶大小:

  1. SetLimit(Limit) 改变放入 Token 的速率
  2. SetBurst(int) 改变 Token 桶大小

有了这两个方法,可以根据现有环境和条件,根据我们的需求,动态的改变 Token 桶大小和速率。

令牌桶限速器实现

了解了令牌桶如何使用后,接下来就可以直接查看下令牌桶限速器是如何实现的:

// k8s.io/client-go/util/workqueue/default_rate_limiter.go

// 令牌桶限速器,固定速率(qps)
type BucketRateLimiter struct {
  // golang 自带的 Limiter 
 *rate.Limiter
}

var _ RateLimiter = &BucketRateLimiter{}

func (r *BucketRateLimiter) When(item interface{}) time.Duration {
 // 获取需要等待的时间(延迟),而且这个延迟是一个相对固定的周期
  return r.Limiter.Reserve().Delay()
}

func (r *BucketRateLimiter) NumRequeues(item interface{}) int {
 // 固定频率,不需要重试
  return 0
}

func (r *BucketRateLimiter) Forget(item interface{}) {
  // 不需要重试,因此也不需要忘记
}

令牌桶限速器里面直接包装一个令牌桶 Limiter 对象,直接通过 Limiter.Reserve().Delay() 方法就可以获取元素需要延迟的时间,再使用这个限速器的时候,默认初始化参数为:

// k8s.io/client-go/util/workqueue/default_rate_limiter.go

BucketRateLimiter{Limiter: rate.NewLimiter(rate.Limit(10), 100)}

通过 rate.NewLimiter 实例化,传入 r 和 b 两个参数,r 表示每秒往“”桶中填充 token 的数量,b 表示令牌桶的大小,这里可以看到默认的参数为速率为10,即每秒放入10个 bucket,桶容量大小为100。

ItemExponentialFailureRateLimiter

ItemExponentialFailureRateLimiter(指数增长限速器) 是比较常用的限速器,从字面意思解释是元素错误次数指数递增限速器,他会根据元素错误次数逐渐累加等待时间,具体实现如下:

// k8s.io/client-go/util/workqueue/default_rate_limiters.go

// 当对象处理失败的时候,其再次入队的等待时间 × 2,到 MaxDelay 为止,直到超过最大失败次数
type ItemExponentialFailureRateLimiter struct {
    // 修改失败次数用到的锁
    failuresLock sync.Mutex           
    // 记录每个元素错误次数
    failures     map[interface{}]int  
    // 元素延迟基数
    baseDelay time.Duration           
    // 元素最大的延迟时间
    maxDelay  time.Duration           
}

// 实现限速器的When接口
func (r *ItemExponentialFailureRateLimiter) When(item interface{}) time.Duration {
    r.failuresLock.Lock()
    defer r.failuresLock.Unlock()

    // 累加错误计数
    exp := r.failures[item]
    r.failures[item] = r.failures[item] + 1
 
    // 通过错误次数计算延迟时间,公式是2^i * baseDelay,按指数递增
    backoff := float64(r.baseDelay.Nanoseconds()) * math.Pow(2, float64(exp))
    if backoff > math.MaxInt64 {
        // 最大延迟时间
        return r.maxDelay
    }

    // 计算后的延迟值和最大延迟值之间取最小值
    calculated := time.Duration(backoff)
    if calculated > r.maxDelay {
        return r.maxDelay
    }
 
    return calculated
}

// 元素错误次数,直接从 failures 中取
func (r *ItemExponentialFailureRateLimiter) NumRequeues(item interface{}) int {
    r.failuresLock.Lock()
    defer r.failuresLock.Unlock()
 
    return r.failures[item]
}

//  直接从 failures 删除指定元素
func (r *ItemExponentialFailureRateLimiter) Forget(item interface{}) {
    r.failuresLock.Lock()
    defer r.failuresLock.Unlock()
 
    delete(r.failures, item)
}

从上面 When() 函数中的算法实现来看,该限速器是出现错误后不断尝试的过程,而且随着尝试次数的增加按照指数增加延迟时间

ItemFastSlowRateLimiter

ItemFastSlowRateLimiter (快慢限速器)和 ItemExponentialFailureRateLimiter 很像,都是用于错误尝试的,但是 ItemFastSlowRateLimiter 的限速策略是尝试次数超过阈值用长延迟,否则用短延迟,不过该限速器很少使用。

// k8s.io/client-go/util/workqueue/default_rate_limiters.go

// 快慢限速器,先以 fastDelay 为周期进行尝试,超过 maxFastAttempts 次数后,按照 slowDelay 为周期进行尝试
type ItemFastSlowRateLimiter struct {
    failuresLock sync.Mutex          
    // 错误次数计数
    failures     map[interface{}]int 

    // 错误尝试阈值
    maxFastAttempts int        
    // 短延迟时间      
    fastDelay       time.Duration    
    // 长延迟时间
    slowDelay       time.Duration    
}

func (r *ItemFastSlowRateLimiter) When(item interface{}) time.Duration {
    r.failuresLock.Lock()
    defer r.failuresLock.Unlock()
 
    // 一样累加错误计数
    r.failures[item] = r.failures[item] + 1

    // 错误次数还未超过阈值
    if r.failures[item] <= r.maxFastAttempts {
        // 用短延迟
        return r.fastDelay
    }
    // 超过了用长延迟
   return r.slowDelay
}

func (r *ItemFastSlowRateLimiter) NumRequeues(item interface{}) int {
    r.failuresLock.Lock()
    defer r.failuresLock.Unlock()
 
    return r.failures[item]
}

func (r *ItemFastSlowRateLimiter) Forget(item interface{}) {
    r.failuresLock.Lock()
    defer r.failuresLock.Unlock()
 
    delete(r.failures, item)
}

MaxOfRateLimiter

MaxOfRateLimiter 也可以叫混合限速器,他内部有多个限速器,选择所有限速器中速度最慢(延迟最大)的一种方案。比如内部有三个限速器,When() 接口返回的就是三个限速器里面延迟最大的。

// k8s.io/client-go/util/workqueue/default_rate_limiters.go

// 混合限速器,选择所有限速器中速度最慢的一种方案
type MaxOfRateLimiter struct {
    // 限速器数组
    limiters []RateLimiter   
}

func (r *MaxOfRateLimiter) When(item interface{}) time.Duration {
    ret := time.Duration(0)
    // 获取所有限速器里面时间最大的延迟时间
    for _, limiter := range r.limiters {
        curr := limiter.When(item)
        if curr > ret {
            ret = curr
        }
    }
    return ret
}

func (r *MaxOfRateLimiter) NumRequeues(item interface{}) int {
 ret := 0
    // Requeues 次数也是取最大值
    for _, limiter := range r.limiters {
        curr := limiter.NumRequeues(item)
        if curr > ret {
            ret = curr
        }
    }
    return ret
}

func (r *MaxOfRateLimiter) Forget(item interface{}) {
    // 循环遍历 Forget
    for _, limiter := range r.limiters {
        limiter.Forget(item)
    }
}

混合限速器的实现非常简单,而在 Kubernetes 中默认的控制器限速器初始化就是使用的混合限速器:

// k8s.io/client-go/util/workqueue/default_rate_limiters.go

// 实例化默认的限速器,由 ItemExponentialFailureRateLimiter 和
// BucketRateLimiter 组成的混合限速器
func DefaultControllerRateLimiter() RateLimiter {
 return NewMaxOfRateLimiter(
  NewItemExponentialFailureRateLimiter(5*time.Millisecond, 1000*time.Second),
  // 10 qps, 100 bucket 容量
  &BucketRateLimiter{Limiter: rate.NewLimiter(rate.Limit(10), 100)},
 )
}

func NewItemExponentialFailureRateLimiter(baseDelay time.Duration, maxDelay time.Duration) RateLimiter {
 return &ItemExponentialFailureRateLimiter{
  failures:  map[interface{}]int{},
  baseDelay: baseDelay,
  maxDelay:  maxDelay,
 }
}

到这里我们就将限速队列分析完了,接下来我们需要了解下 WorkQueue 在控制器中是如何使用的。