剖析Go的读写锁

时间:2022-05-06
本文章向大家介绍剖析Go的读写锁,主要内容包括一个(神奇)优秀的(大坑)特性、源码剖析、名词解释、参考文献、基本概念、基础应用、原理机制和需要注意的事项等,并结合实例形式分析了其使用技巧,希望通过本文能帮助到大家理解应用这部分内容。
package main

import (

	"fmt"

	"sync"

	"time"

)

func main() {

	rw := new(sync.RWMutex)

	for i := 0; i < 2; i++ {   // 建立两个写者

		go func() {

			for j := 0; j < 3; j++ {

				rw.Lock()

				// 写

				rw.Unlock()

			}

		}()

	}

	for i := 0; i < 5; i++ {    // 建立两个读者

		go func() {

			for j := 0; j < 3; j++ {

				rw.RLock()

				// 读

				rw.RUnlock()

			}

		}()

	}

	time.Sleep(time.Second)

	fmt.Println("Done")

}

PlayGround

一个(神奇)优秀的(大坑)特性

读者在读的时候,不能够假定别的读者也能够获得锁。因此,禁止读锁嵌套。

是不是有点儿绕?下面举个“七秒例”:?

  • 第一秒:读者1在第1秒成功申请了读锁
  • 第二秒:写者1在第2秒申请写锁,申请失败,阻塞,但它会防止新的读者获锁
  • 第三秒:读者2在第3秒申请读锁,申请失败
  • 第四秒:读者1释放读锁,写者1获得写锁
  • 第五秒:写者1释放写锁,读者2获得读锁
  • 第六秒:读者1再次申请读锁,申请成功,与读者2共享
  • 第七秒:读者1、读者2释放读锁,结束

当写锁阻塞时,新的读锁是无法申请的,这可以有效防止写者饥饿。如果一个线程因为某种原因,导致得不到CPU运行时间,这种状态被称之为 饥饿

然而,这种机制也禁止了读锁嵌套。读锁嵌套可能造成死锁:

package main

import (

	"fmt"

	"sync"

	"time"

)

func main() {

	rw := new(sync.RWMutex)

	var deadLockCase time.Duration = 1

	go func() {

		time.Sleep(time.Second * deadLockCase)

		fmt.Println("Writer Try")

		rw.Lock()

		fmt.Println("Writer Fetch")

		time.Sleep(time.Second * 1)

		fmt.Println("Writer Release")

		rw.Unlock()

	}()

	fmt.Println("Reader 1 Try")

	rw.RLock()

	fmt.Println("Reader 1 Fetch")

	time.Sleep(time.Second * 2)

	fmt.Println("Reader 2 Try")

	rw.RLock()

	fmt.Println("Reader 2 Fetch")

	time.Sleep(time.Second * 2)

	fmt.Println("Reader 1 Release")

	rw.RUnlock()

	time.Sleep(time.Second * 1)

	fmt.Println("Reader 2 Release")

	rw.RUnlock()

	time.Sleep(time.Second * 2)

	fmt.Println("Done")

}

读者1和读者2是嵌套关系,按照这种时间安排,上述程序会导致死锁。

而有些死锁的可怕之处就在于,它不一定会发生。假设上面程序中的time.Sleep都是随机的时间,那么这一段代码每次的结果有可能不一致,这会给Debug带来极大的困难。

吾闻读锁莫嵌套,写锁嵌套长已矣。(读锁嵌套了还有概率成功,写锁嵌套了100%完蛋?)

源码剖析

(源码具体内容、行数,以版本go version 1.8.1为例。)

为了方便理解,可以把所有的if race.Enabled {...}扔掉不看。接下来,我们重述“七秒例”。?

第一秒,读者1请求读锁。

Line41:

	if atomic.AddInt32(&rw.readerCount, 1) < 0 {

		// A writer is pending, wait for it.

		runtime_Semacquire(&rw.readerSem)

	}

读者数量readerCount开始是0,这个时候加1,变成了1,不符合判负条件所以跳出,成功获得读锁一枚。

第二秒,写者尝试获取写锁。第85行获取w的锁。不管这个读写锁有没有获取成功,先排斥别的写者。


Line85:

	// First, resolve competition with other writers.

	rw.w.Lock()

	// Announce to readers there is a pending writer.

	r := atomic.AddInt32(&rw.readerCount, -rwmutexMaxReaders) + rwmutexMaxReaders

	// Wait for active readers.

	if r != 0 && atomic.AddInt32(&rw.readerWait, r) != 0 {

		runtime_Semacquire(&rw.writerSem)

	}

刚才说了,一个写者阻塞在这里的时候,也不会让新的读者去读了,所以它干了一件非常坏的事情: 把readerCount变成了1-rwmutexMaxReaders。 这样就能卡住新来的读者了。 接下来,算出r等于1。这意味着有当前有写者存在。 因为有读者,所以写者卡在了信号量writerSem上。但是它不甘心啊,心想“等完现在的这几个读者,我就要去写!”,它默默地把现在占有读锁的人记在了小本本rw.readerWait上。在本例子中,readerWait被设置为了1。

第三秒,读者2尝试获得读锁,它又来到了第41行,结果发现读者的数量是1-rwmutexMaxReaders,好吧,它只好卡在信号量readerSem上。

第四秒,读者1调用RUnlock(),它首先把读者数量减一,毕竟自己已经不读了。

Line61:

	if r := atomic.AddInt32(&rw.readerCount, -1); r < 0 {

		// A writer is pending.

		if atomic.AddInt32(&rw.readerWait, -1) == 0 {

			// The last reader unblocks the writer.

			runtime_Semrelease(&rw.writerSem)

		}

	}

在读者数量减一的时候,它发现读者数量是负数,这回读者1明白了,有一个写者在等待写。估计读者1自己已经在这个写者的小本本readerWait上了,因此它把readerWait减一,表示自己不读了。这时候读者1发现自己就是最后一个读者了,所以赶紧祭出writerSem,让写者可以去写。 读者1释放了writerSem信号量以后,写者很快就收到了这个提醒,兴高采烈地获得了写锁,开始自己的写作生涯。

读者2还卡着呢…

第五秒,写者1写完了一稿便不想写了,调用Unlock()准备释放读锁。

Line114:

	// Announce to readers there is no active writer.

	r := atomic.AddInt32(&rw.readerCount, rwmutexMaxReaders)

	// Unblock blocked readers, if any.

	for i := 0; i < int(r); i++ {

		runtime_Semrelease(&rw.readerSem)

	}

只见他重新为readerCount加上rwmutexMaxReaders,使他重新变为了正数。这个正数恰好也是阻塞的读者的数量。 接下来,写者按照这个读者的数量,释放了这么多的readerSem信号量,相当于将所有阻塞的读者一一唤醒。读者2在收到readerSem的那一刻喜极而泣,它终于可以读了。

第六秒,读者1又来了,它把读者数量加1,发现它是正数哎,写者现在又没来,它再次幸运地瞬间获得读锁,与读者2一起读了起来。

第七秒,读者1和读者2都释放了自己的读锁。至此,结束。

名词解释

中文

英文

解释

信号量 (也称信号灯)

Semaphore

条件变量

Condition

互斥量

Mutex

参考文献

  1. Wikipedia: Semaphore (programming))