fasthttp中的协程池实现
协程池可以控制并行度,复用协程。fasthttp 比 net/http 效率高很多倍的重要原因,就是利用了协程池。实现并不复杂,我们可以参考他的设计,写出高性能的应用。
入口
// server.go
func (s *Server) Serve(ln net.Listener) error {
var lastOverflowErrorTime time.Time
var lastPerIPErrorTime time.Time
var c net.Conn
var err error
maxWorkersCount := s.getConcurrency()
s.concurrencyCh = make(chan struct{}, maxWorkersCount)
wp := &workerPool{
WorkerFunc: s.serveConn,
MaxWorkersCount: maxWorkersCount,
LogAllErrors: s.LogAllErrors,
Logger: s.logger(),
}
// break-01
wp.Start()
for {
// break-02
if c, err = acceptConn(s, ln, &lastPerIPErrorTime); err != nil {
wp.Stop()
if err == io.EOF {
return nil
}
return err
}
// break-03
if !wp.Serve(c) {
s.writeFastError(c, StatusServiceUnavailable,
"The connection cannot be served because Server.Concurrency limit exceeded")
c.Close()
if time.Since(lastOverflowErrorTime) > time.Minute {
s.logger().Printf("The incoming connection cannot be served, because %d concurrent connections are served. "+
"Try increasing Server.Concurrency", maxWorkersCount)
lastOverflowErrorTime = CoarseTimeNow()
}
// The current server reached concurrency limit,
// so give other concurrently running servers a chance
// accepting incoming connections on the same address.
//
// There is a hope other servers didn't reach their
// concurrency limits yet :)
time.Sleep(100 * time.Millisecond)
}
c = nil
}
}
// 有必要了解一下 workerPool 的结构
type workerPool struct {
// Function for serving server connections.
// It must leave c unclosed.
WorkerFunc func(c net.Conn) error
MaxWorkersCount int
LogAllErrors bool
MaxIdleWorkerDuration time.Duration
Logger Logger
lock sync.Mutex
workersCount int
mustStop bool
ready []*workerChan
stopCh chan struct{}
workerChanPool sync.Pool
}
goroutine status:
- main0: wp.Start()
break-01
// workerpool.go
// 启动一个 goroutine, 每隔一段时间,清理一下 []*workerChan;
// wp.clean() 的操作是 查看最近使用的workerChan, 如果他的最近使用间隔大于某个值,那么把这个workerChan清理了。
func (wp *workerPool) Start() {
if wp.stopCh != nil {
panic("BUG: workerPool already started")
}
wp.stopCh = make(chan struct{})
stopCh := wp.stopCh
go func() {
var scratch []*workerChan
for {
wp.clean(&scratch)
select {
case <-stopCh:
return
default:
time.Sleep(wp.getMaxIdleWorkerDuration())
}
}
}()
}
goroutine status:
- main0: wp.Start()
- g1: for loop to clean idle workerChan
break-02
acceptConn(s, ln, &lastPerIPErrorTime)
主要处理 ln.Accept(),判断err是否是 Temporary 的,最终返回一个 net.Conn
break-03
// workerpool.go
func (wp *workerPool) Serve(c net.Conn) bool {
// break-04
ch := wp.getCh()
if ch == nil {
return false
}
ch.ch <- c
return true
}
type workerChan struct {
lastUseTime time.Time
ch chan net.Conn
}
wp.getCh() 返回一个 *workerChan, 可以看到, workerChan 有一个 ch 属性,参数传入的 net.Conn 直接往里面塞。
break-04
// workerpool.go
func (wp *workerPool) getCh() *workerChan {
var ch *workerChan
createWorker := false
wp.lock.Lock()
ready := wp.ready
n := len(ready) - 1
if n < 0 {
// ready 为空,并且总数小于 MaxWorkersCount,那么需要创建新的 workerChan
if wp.workersCount < wp.MaxWorkersCount {
createWorker = true
wp.workersCount++
}
} else {
// 从队列尾部取出一个 workerChan
ch = ready[n]
ready[n] = nil
wp.ready = ready[:n]
}
wp.lock.Unlock()
if ch == nil {
if !createWorker {
return nil
}
// 走入创建流程,从 Pool中取出 workerChan
vch := wp.workerChanPool.Get()
if vch == nil {
vch = &workerChan{
ch: make(chan net.Conn, workerChanCap),
}
}
ch = vch.(*workerChan)
// 创建goroutine处理请求,接收一个 chan *workerChan 作为参数
go func() {
// break-05
wp.workerFunc(ch)
wp.workerChanPool.Put(vch)
}()
}
return ch
}
这里我们只看创建的流程。如果ready为空,说明ready被耗尽,并且小于 MaxWorkersCount,那么需要创建新的 workerChan。 创建时,先从 Pool 中取出复用,如果为nil,再创建新的。 可以预测到,这里 wp.workerFunc(ch) 必定包含一个 for 循环,处理 workerChan 中的 net.Conn。
goroutine status:
- main0: wp.Start()
- g1: for loop to clean idle workerChan
- g2: wp.workerFunc(ch) blocks for handling connection
break-05
// workerpool.go
func (wp *workerPool) workerFunc(ch *workerChan) {
var c net.Conn
var err error
for c = range ch.ch {
if c == nil {
break
}
// 正真的处理请求的函数
if err = wp.WorkerFunc(c); err != nil && err != errHijacked {
errStr := err.Error()
if wp.LogAllErrors || !(strings.Contains(errStr, "broken pipe") ||
strings.Contains(errStr, "reset by peer") ||
strings.Contains(errStr, "i/o timeout")) {
wp.Logger.Printf("error when serving connection %q<->%q: %s", c.LocalAddr(), c.RemoteAddr(), err)
}
}
if err != errHijacked {
c.Close()
}
c = nil
// 释放 workerChan
// break-06
if !wp.release(ch) {
break
}
}
// 跳出 for range 循环, 意味着 从chan中取得一个 nil,或者 wp.mustStop 被设为了true,这是主动停止的方法。
wp.lock.Lock()
wp.workersCount--
wp.lock.Unlock()
}
for range 不断从 chan net.Conn 中获取连接。大家是否还记得 在 func (wp *workerPool) Serve(c net.Conn) bool
函数中,一个重要操作就是把 accept 到的connection,放入 channel.
最后,需要把当前的 workerChan 释放回 workerPool 的 ready 中。
break-06
func (wp *workerPool) release(ch *workerChan) bool {
ch.lastUseTime = CoarseTimeNow()
wp.lock.Lock()
if wp.mustStop {
wp.lock.Unlock()
return false
}
wp.ready = append(wp.ready, ch)
wp.lock.Unlock()
return true
}
释放操作中,注意到 修改了 ch.lastUseTime , 还记得 clean 操作吗?在 g1 协程中运行着呢。 所以最后的运行状态是:
goroutine status:
- main0: wp.Start()
- g1: for loop to clean idle workerChan
- g2: wp.workerFunc(ch) blocks for handling connection
- g3: ....
- g4: ....
按需增长 goroutine 数量,但是也有一个最大值, 所以并行度是可控的。当请求密集时,一个 worker goroutine 可能会串行处理多个 connection。 wokerChan 在 Pool 中被复用,对GC的压力会减小很多。
而对比原生的 net/http 包,并行度不可控(可能不确定,runtime 会有控制? ),goroutine 不可被复用,体现在一个请求一个goroutine, 用完就销毁了,对机器压力更大。
本文来自:Segmentfault
感谢作者:一堆好人卡
查看原文:fasthttp中的协程池实现
- 超级玛丽游戏
- POJ 3673 Cow Multiplication
- HDU 5144 NPY and shot(物理运动学+三分查找)
- 深度|Python股票数据分析
- HDU 2438 Turn the corner(三分查找)
- UVAlive 3708 Graveyard(最优化问题)
- HDU 1754 I Hate It(线段树之单点更新,区间最值)
- Selenium2+python自动化19-单选和复选框
- Uva 11300 Spreading the Wealth(递推,中位数)
- Uva 11729 Commando War (简单贪心)
- UVA 11292 Dragon of Loowater(简单贪心)
- Codeforces Beta Round #2 A,B,C
- 牛顿迭代法(Newton's Method)
- 最长递减子序列(nlogn)(个人模版)
- JavaScript 教程
- JavaScript 编辑工具
- JavaScript 与HTML
- JavaScript 与Java
- JavaScript 数据结构
- JavaScript 基本数据类型
- JavaScript 特殊数据类型
- JavaScript 运算符
- JavaScript typeof 运算符
- JavaScript 表达式
- JavaScript 类型转换
- JavaScript 基本语法
- JavaScript 注释
- Javascript 基本处理流程
- Javascript 选择结构
- Javascript if 语句
- Javascript if 语句的嵌套
- Javascript switch 语句
- Javascript 循环结构
- Javascript 循环结构实例
- Javascript 跳转语句
- Javascript 控制语句总结
- Javascript 函数介绍
- Javascript 函数的定义
- Javascript 函数调用
- Javascript 几种特殊的函数
- JavaScript 内置函数简介
- Javascript eval() 函数
- Javascript isFinite() 函数
- Javascript isNaN() 函数
- parseInt() 与 parseFloat()
- escape() 与 unescape()
- Javascript 字符串介绍
- Javascript length属性
- javascript 字符串函数
- Javascript 日期对象简介
- Javascript 日期对象用途
- Date 对象属性和方法
- Javascript 数组是什么
- Javascript 创建数组
- Javascript 数组赋值与取值
- Javascript 数组属性和方法
- PHP中str_split()函数的用法讲解
- 微信JSSDK分享功能图文实例详解
- spring-boot-route(八)整合mybatis操作数据库
- PHP扩展Swoole实现实时异步任务队列示例
- ThinkPHP框架下微信支付功能总结踩坑笔记
- spring-boot-route(九)整合JPA操作数据库
- spring-boot-route(十)多数据源切换
- spring-boot-route(十一)数据库配置信息加密
- PHP中number_format()函数的用法讲解
- php7新特性的理解和比较总结
- PHP之认识(二)关于Traits的用法详解
- 详细分析Python可变对象和不可变对象
- spring-boot-route(十二)整合redis做为缓存
- ThinkPHP框架实现的微信支付接口开发完整示例
- spring-boot-route(十三)整合RabbitMQ消息队列