Kubernetes 源码学习之限速队列
前面我们详细分析了 client-go 中的延迟队列的实现,接下来就是限速队列的实现,限速队列在我们日常应用中非常广泛,其原理也比较简单,利用延迟队列的特性,延迟某个元素的插入时间来达到限速的目的。
所以限速队列是扩展的延迟队列,在其基础上增加了 AddRateLimited
、Forget
、NumRequeues
3个方法。限速队列接口的定义如下所示:
// k8s.io/client-go/util/workqueue/rate_limiting_queue.go
// RateLimitingInterface 是对加入队列的元素进行速率限制的接口
type RateLimitingInterface interface {
// 延时队列
DelayingInterface
// 在限速器说ok后,将元素item添加到工作队列中
AddRateLimited(item interface{})
// 丢弃指定的元素
Forget(item interface{})
// 查询元素放入队列的次数
NumRequeues(item interface{}) int
}
很明显我们可以看出来限速队列是在延时队列基础上进行的扩展,接下来我们查看下限速队列的实现结构:
// k8s.io/client-go/util/workqueue/rate_limiting_queue.go
// 限速队列的实现
type rateLimitingType struct {
// 同样集成了延迟队列
DelayingInterface
// 因为是限速队列,所以在里面定义一个限速器
rateLimiter RateLimiter
}
// 通过限速器获取延迟时间,然后加入到延时队列
func (q *rateLimitingType) AddRateLimited(item interface{}) {
q.DelayingInterface.AddAfter(item, q.rateLimiter.When(item))
}
// 直接通过限速器获取元素放入队列的次数
func (q *rateLimitingType) NumRequeues(item interface{}) int {
return q.rateLimiter.NumRequeues(item)
}
// 直接通过限速器丢弃指定的元素
func (q *rateLimitingType) Forget(item interface{}) {
q.rateLimiter.Forget(item)
}
我们可以看到限速队列的实现非常简单,就是通过包含的限速器去实现各种限速的功能,所以我们要真正去了解的是限速器的实现原理。
限速器
限速器当然也是一种接口抽象,我们可以实现各种各样的限速器,甚至不限速也可以,该接口定义如下所示:
// k8s.io/client-go/util/workqueue/default_rate_limiter.go
// 限速器接口
type RateLimiter interface {
// 获取指定的元素需要等待的时间
When(item interface{}) time.Duration
// 释放指定元素,表示该元素已经被处理了
Forget(item interface{})
// 返回某个对象被重新入队多少次,监控用
NumRequeues(item interface{}) int
}
可以看到和上面的限速队列的扩展接口方法非常类似,索引在实现的时候我们都是直接调用限速器对应的实现方法。接下来我们来看看几种限速器的具体实现。
BucketRateLimiter
第一个要了解的限速器使用非常频繁 - BucketRateLimiter(令牌桶限速器),这是一个固定速率(qps)的限速器,该限速器是利用 golang.org/x/time/rate
库来实现的,令牌桶算法内部实现了一个存放 token(令牌)的“桶”,初始时“桶”是空的,token 会以固定速率往“桶”里填充,直到将其填满为止,多余的 token 会被丢弃。每个元素都会从令牌桶得到一个 token,只有得到 token 的元素才允许通过,而没有得到 token 的元素处于等待状态。令牌桶算法通过控制发放 token 来达到限速目的。
比如抽奖、抢优惠、投票、报名……等场景,在面对突然到来的上百倍流量峰值,除了消息队列,预留容量以外,我们可以考虑做峰值限流。因为对于大部分营销类活动,消息限流(对被限流的消息直接丢弃,并直接回复:“系统繁忙,请稍后再试。”)并不会对营销的结果有太大影响。
令牌桶是有一个固定大小的桶,系统会以恒定的速率向桶中放 Token,桶满了就暂时不放了,而用户则从桶中取 Token,如果有剩余的 Token 就可以一直取,如果没有剩余的 Token,则需要等到系统中放置了 Token 才行。
golang 中就自带了一个令牌桶限速器的实现,我们可以使用以下方法构造一个限速器对象:
limiter := NewLimiter(10, 1);
该构造函数包含两个参数:
- 第一个参数是
r Limit
。代表每秒可以向 Token 桶中产生多少 token,Limit 实际上是 float64 的别名。 - 第二个参数是
b int
。b 代表 Token 桶的容量大小。
上面我们构造出的限速器含义就是,其令牌桶大小为 1,以每秒 10 个 Token 的速率向桶中放置 Token。
除了直接指定每秒产生的 Token 个数外,还可以用 Every
方法来指定向 Token 桶中放置 Token 的间隔,例如:
limit := Every(100 * time.Millisecond)
limiter := NewLimiter(limit, 1)
以上就表示每 100ms 往桶中放一个 Token,本质上也就是一秒钟产生 10 个。
Limiter 提供了三类方法供用户消费 Token,用户可以每次消费一个 Token,也可以一次性消费多个 Token。而每种方法代表了当 Token 不足时,各自不同的对应手段。
Wait/WaitN
func (lim *Limiter) Wait(ctx context.Context) (err error)
func (lim *Limiter) WaitN(ctx context.Context, n int) (err error)
Wait 实际上就是 WaitN(ctx,1)
。当使用 Wait 方法消费 Token 时,如果此时桶内 Token 不足时 (小于 N),那么 Wait 方法将会阻塞一段时间,直至 Token 满足条件,当然如果充足则直接返回。我们可以看到,Wait 方法有一个 context 参数,我们可以设置 context 的 Deadline 或者 Timeout,来决定此次 Wait 的最长时间。
Allow/AllowN
func (lim *Limiter) Allow() bool
func (lim *Limiter) AllowN(now time.Time, n int) bool
Allow 实际上就是 AllowN(time.Now(),1)
。AllowN 方法表示,截止到某一时刻,目前桶中数目是否至少为 n 个,满足则返回 true,同时从桶中消费 n 个 token。反之返回不消费 Token,false。通常对应这样的线上场景,如果请求速率过快,就直接丢到某些请求。
Reserve/ReserveN
func (lim *Limiter) Reserve() *Reservation
func (lim *Limiter) ReserveN(now time.Time, n int) *Reservation
Reserve 相当于 ReserveN(time.Now(), 1)
。ReserveN 的用法就相对来说复杂一些,当调用完成后,无论 Token 是否充足,都会返回一个 Reservation
对象指针。
你可以调用该对象的 Delay() 方法,该方法返回了需要等待的时间,如果等待时间为 0,则说明不用等待。必须等到等待时间之后,才能进行接下来的工作。或者,如果不想等待,可以调用 Cancel() 方法,该方法会将 Token 归还。
动态调整速率
此外 Limiter 还支持调整速率和桶大小:
-
SetLimit(Limit)
改变放入 Token 的速率 -
SetBurst(int)
改变 Token 桶大小
有了这两个方法,可以根据现有环境和条件,根据我们的需求,动态的改变 Token 桶大小和速率。
令牌桶限速器实现
了解了令牌桶如何使用后,接下来就可以直接查看下令牌桶限速器是如何实现的:
// k8s.io/client-go/util/workqueue/default_rate_limiter.go
// 令牌桶限速器,固定速率(qps)
type BucketRateLimiter struct {
// golang 自带的 Limiter
*rate.Limiter
}
var _ RateLimiter = &BucketRateLimiter{}
func (r *BucketRateLimiter) When(item interface{}) time.Duration {
// 获取需要等待的时间(延迟),而且这个延迟是一个相对固定的周期
return r.Limiter.Reserve().Delay()
}
func (r *BucketRateLimiter) NumRequeues(item interface{}) int {
// 固定频率,不需要重试
return 0
}
func (r *BucketRateLimiter) Forget(item interface{}) {
// 不需要重试,因此也不需要忘记
}
令牌桶限速器里面直接包装一个令牌桶 Limiter 对象,直接通过 Limiter.Reserve().Delay()
方法就可以获取元素需要延迟的时间,再使用这个限速器的时候,默认初始化参数为:
// k8s.io/client-go/util/workqueue/default_rate_limiter.go
BucketRateLimiter{Limiter: rate.NewLimiter(rate.Limit(10), 100)}
通过 rate.NewLimiter 实例化,传入 r 和 b 两个参数,r 表示每秒往“”桶中填充 token 的数量,b 表示令牌桶的大小,这里可以看到默认的参数为速率为10,即每秒放入10个 bucket,桶容量大小为100。
ItemExponentialFailureRateLimiter
ItemExponentialFailureRateLimiter
(指数增长限速器) 是比较常用的限速器,从字面意思解释是元素错误次数指数递增限速器,他会根据元素错误次数逐渐累加等待时间,具体实现如下:
// k8s.io/client-go/util/workqueue/default_rate_limiters.go
// 当对象处理失败的时候,其再次入队的等待时间 × 2,到 MaxDelay 为止,直到超过最大失败次数
type ItemExponentialFailureRateLimiter struct {
// 修改失败次数用到的锁
failuresLock sync.Mutex
// 记录每个元素错误次数
failures map[interface{}]int
// 元素延迟基数
baseDelay time.Duration
// 元素最大的延迟时间
maxDelay time.Duration
}
// 实现限速器的When接口
func (r *ItemExponentialFailureRateLimiter) When(item interface{}) time.Duration {
r.failuresLock.Lock()
defer r.failuresLock.Unlock()
// 累加错误计数
exp := r.failures[item]
r.failures[item] = r.failures[item] + 1
// 通过错误次数计算延迟时间,公式是2^i * baseDelay,按指数递增
backoff := float64(r.baseDelay.Nanoseconds()) * math.Pow(2, float64(exp))
if backoff > math.MaxInt64 {
// 最大延迟时间
return r.maxDelay
}
// 计算后的延迟值和最大延迟值之间取最小值
calculated := time.Duration(backoff)
if calculated > r.maxDelay {
return r.maxDelay
}
return calculated
}
// 元素错误次数,直接从 failures 中取
func (r *ItemExponentialFailureRateLimiter) NumRequeues(item interface{}) int {
r.failuresLock.Lock()
defer r.failuresLock.Unlock()
return r.failures[item]
}
// 直接从 failures 删除指定元素
func (r *ItemExponentialFailureRateLimiter) Forget(item interface{}) {
r.failuresLock.Lock()
defer r.failuresLock.Unlock()
delete(r.failures, item)
}
从上面 When() 函数中的算法实现来看,该限速器是出现错误后不断尝试的过程,而且随着尝试次数的增加按照指数增加延迟时间。
ItemFastSlowRateLimiter
ItemFastSlowRateLimiter
(快慢限速器)和 ItemExponentialFailureRateLimiter
很像,都是用于错误尝试的,但是 ItemFastSlowRateLimiter
的限速策略是尝试次数超过阈值用长延迟,否则用短延迟,不过该限速器很少使用。
// k8s.io/client-go/util/workqueue/default_rate_limiters.go
// 快慢限速器,先以 fastDelay 为周期进行尝试,超过 maxFastAttempts 次数后,按照 slowDelay 为周期进行尝试
type ItemFastSlowRateLimiter struct {
failuresLock sync.Mutex
// 错误次数计数
failures map[interface{}]int
// 错误尝试阈值
maxFastAttempts int
// 短延迟时间
fastDelay time.Duration
// 长延迟时间
slowDelay time.Duration
}
func (r *ItemFastSlowRateLimiter) When(item interface{}) time.Duration {
r.failuresLock.Lock()
defer r.failuresLock.Unlock()
// 一样累加错误计数
r.failures[item] = r.failures[item] + 1
// 错误次数还未超过阈值
if r.failures[item] <= r.maxFastAttempts {
// 用短延迟
return r.fastDelay
}
// 超过了用长延迟
return r.slowDelay
}
func (r *ItemFastSlowRateLimiter) NumRequeues(item interface{}) int {
r.failuresLock.Lock()
defer r.failuresLock.Unlock()
return r.failures[item]
}
func (r *ItemFastSlowRateLimiter) Forget(item interface{}) {
r.failuresLock.Lock()
defer r.failuresLock.Unlock()
delete(r.failures, item)
}
MaxOfRateLimiter
MaxOfRateLimiter 也可以叫混合限速器,他内部有多个限速器,选择所有限速器中速度最慢(延迟最大)的一种方案。比如内部有三个限速器,When() 接口返回的就是三个限速器里面延迟最大的。
// k8s.io/client-go/util/workqueue/default_rate_limiters.go
// 混合限速器,选择所有限速器中速度最慢的一种方案
type MaxOfRateLimiter struct {
// 限速器数组
limiters []RateLimiter
}
func (r *MaxOfRateLimiter) When(item interface{}) time.Duration {
ret := time.Duration(0)
// 获取所有限速器里面时间最大的延迟时间
for _, limiter := range r.limiters {
curr := limiter.When(item)
if curr > ret {
ret = curr
}
}
return ret
}
func (r *MaxOfRateLimiter) NumRequeues(item interface{}) int {
ret := 0
// Requeues 次数也是取最大值
for _, limiter := range r.limiters {
curr := limiter.NumRequeues(item)
if curr > ret {
ret = curr
}
}
return ret
}
func (r *MaxOfRateLimiter) Forget(item interface{}) {
// 循环遍历 Forget
for _, limiter := range r.limiters {
limiter.Forget(item)
}
}
混合限速器的实现非常简单,而在 Kubernetes 中默认的控制器限速器初始化就是使用的混合限速器:
// k8s.io/client-go/util/workqueue/default_rate_limiters.go
// 实例化默认的限速器,由 ItemExponentialFailureRateLimiter 和
// BucketRateLimiter 组成的混合限速器
func DefaultControllerRateLimiter() RateLimiter {
return NewMaxOfRateLimiter(
NewItemExponentialFailureRateLimiter(5*time.Millisecond, 1000*time.Second),
// 10 qps, 100 bucket 容量
&BucketRateLimiter{Limiter: rate.NewLimiter(rate.Limit(10), 100)},
)
}
func NewItemExponentialFailureRateLimiter(baseDelay time.Duration, maxDelay time.Duration) RateLimiter {
return &ItemExponentialFailureRateLimiter{
failures: map[interface{}]int{},
baseDelay: baseDelay,
maxDelay: maxDelay,
}
}
到这里我们就将限速队列分析完了,接下来我们需要了解下 WorkQueue 在控制器中是如何使用的。
- 使用Topshelf创建Windows 服务
- 自定义AuthorizeAttribute
- 系统进程管理工具Process Explorer
- jquery 操作DOM元素(1)
- 开源的读取Excel文件组件-ExcelDataReader
- BlackPearl 的 ServiceObject 开发部署
- [程序设计语言]-[核心概念]-04:数据类型
- jquery 筛选元素(1)
- [程序设计语言]-00:目录
- 使用 SQL Server 2008 数据类型-xml 字段类型参数进行数据的批量选取或删除数据
- jquery 筛选元素 (2)
- 专家:中国还不是网络强国 今后须打破国外垄断
- jquery 筛选元素 (3)
- ASP.NET安全隐患的临时解决方法
- JavaScript 教程
- JavaScript 编辑工具
- JavaScript 与HTML
- JavaScript 与Java
- JavaScript 数据结构
- JavaScript 基本数据类型
- JavaScript 特殊数据类型
- JavaScript 运算符
- JavaScript typeof 运算符
- JavaScript 表达式
- JavaScript 类型转换
- JavaScript 基本语法
- JavaScript 注释
- Javascript 基本处理流程
- Javascript 选择结构
- Javascript if 语句
- Javascript if 语句的嵌套
- Javascript switch 语句
- Javascript 循环结构
- Javascript 循环结构实例
- Javascript 跳转语句
- Javascript 控制语句总结
- Javascript 函数介绍
- Javascript 函数的定义
- Javascript 函数调用
- Javascript 几种特殊的函数
- JavaScript 内置函数简介
- Javascript eval() 函数
- Javascript isFinite() 函数
- Javascript isNaN() 函数
- parseInt() 与 parseFloat()
- escape() 与 unescape()
- Javascript 字符串介绍
- Javascript length属性
- javascript 字符串函数
- Javascript 日期对象简介
- Javascript 日期对象用途
- Date 对象属性和方法
- Javascript 数组是什么
- Javascript 创建数组
- Javascript 数组赋值与取值
- Javascript 数组属性和方法
- PHP strripos函数用法总结
- Laravel5.0+框架邮件发送功能实现方法图文与实例详解
- Ajax+PHP实现的删除数据功能示例
- tensorflow下的图片标准化函数per_image_standardization用法
- 浅析Python面向对象编程
- Python单元测试及unittest框架用法实例解析
- Tensorflow中批量读取数据的案列分析及TFRecord文件的打包与读取
- YII框架实现自定义第三方扩展操作示例
- 在Tensorflow中实现leakyRelu操作详解(高效)
- Django def clean()函数对表单中的数据进行验证操作
- Python3爬虫中Splash的知识总结
- Laravel框架自定义公共函数的引入操作示例
- PHP PDOStatement::setFetchMode讲解
- Python QTimer实现多线程及QSS应用过程解析
- PHP count()函数讲解