锁【Golang】

来自智得网
跳转至: 导航、​ 搜索

简介

通过通信共享数据,而不是通过共享内存共享数据是Golang中的并发理念,通信共享的数据只在自己的程序上下文有效,不会把副作用扩散到其他部分,所以通过通信共享数据更加简单,效率也更高。但是依然有场景需要不同线程之间共享数据,共享临界区的数据需要同步机制,通常也就是锁。

Golang的锁分为普通的互斥锁以及读写锁,除了需要阻塞等待的锁之外,Golang还提供了原子操作类sync/atomic,atomic可以实现自旋锁。

原理

互斥锁

sync.Mutex是Golang中普通的互斥锁的实现,锁的实现需要下列组件。

临界区状态的标识,代表临界区是否已经被占用。

请求锁的流程,请求锁的过程如果临界区还没有被锁定,则直接加锁,如果已经被其他线程锁定,sync.Mutex首先通过自旋的方式等待,通过若干次自旋仍然不能获取锁,则通过信号量阻塞当前线程。

锁的释放,通过信号唤醒阻塞的线程。

sync.Mutex的定义如下:

type Mutex struct {
    state int32
    sema  uint32
}

const (
    mutexLocked = 1 << iota
    mutexWoken
    mutexStarving
    mutexWaiterShift = iota   // mutexWaiterShift值为3,通过右移3位的位运算,可计算waiter个数
    starvationThresholdNs = 1e6 // 1ms,进入饥饿状态的等待时间
)

Golang 语言中的 Mutex 实现在进入阻塞状态之前会采用自旋的方式来尝试加锁/解锁。

  • 加锁
    • 调用 Lock 加锁是在没有冲突的情况下直接使用 CAS 加锁。
    • 加锁失败的情况下首先尝试自旋等待锁释放,进入自旋的条件是多核、GOMAXPROCS > 1 并且 mutex 是非饥饿模式的情况。
    • 如果自旋尝试超过4次仍未获取锁,则按照 FIFO 的规则加入队列等待。
  • 解锁
    • 调用Unlock时如果没有冲突,则直接解锁。
    • 否则就去唤醒等待队列中的一个 goroutine 去获取锁。

Mutex 可能处于两种不同的模式,正常模式和饥饿模式,两种模式类似于非公平锁和公平锁。

正常模式下,在队列中等待的 goroutine 和新的 goroutine 一起竞争 mutex 的使用权。 新的 goroutine 可能数量更多,而且正在CPU上运行,所以被唤醒的等待者有一定概率获取不到锁。

为了避免 goroutine 长期不能获取锁而发生饿死现象,Golang 的 Mutex 提供了饥饿模式。 当 goroutine 等待 mutex 的时间超过 1ms, mutex 就会切换到饥饿模式。

在饥饿模式下,Mutex 会直接从等待队列的头部获取锁的下一个持有者。 新的 goroutine 不会去获取 mutex,也不进行自旋, 而是直接排到等待队列的尾部。

饥饿模式在以下两种情况下会切回到正常模式:

  1. 获取锁的 goroutine 是队列中最后的一个等待者。
  2. 获取锁的 goroutine 等待时长少于 1ms。

读写锁

对于读写锁而言,同样需要包含临界区标识,请求和释放锁的能力。

但是读写锁因为读读可以并行,读写需要互斥,所以状态标识以及新号量都需要两个。

type RWMutex struct {
    w           Mutex  
    writerSem   uint32 
    readerSem   uint32 
    readerCount int32  
    readerWait  int32  
}

读写锁的语意需要解决如下问题:

  • 读锁是共享锁,多个读操作可以共享读锁,读锁需要阻塞写锁。
  • 写锁是互斥锁,多个写操作只有一个可以获取写锁,写锁同时阻塞读锁和写锁。

总结就是读读共享、写写互斥、读写互斥。

  • 读读共享

多个读操作获取读锁时,读锁计数进行加一

atomic.AddInt32(&rw.readerCount, 1)
  • 读写互斥

已有写锁,获取读锁进入读信号的等待队列:

if atomic.AddInt32(&rw.readerCount, 1) < 0 { 
    //readerCount+1<0说明存在写线程,该进入读信号量的等待队列 
    runtime_SemacquireMutex(&rw.readerSem, false, 0) 
}

已有读锁,获取写锁的时候:

//r表示当前读线程的个数
r := atomic.AddInt32(&rw.readerCount, -rwmutexMaxReaders) + rwmutexMaxReaders
//将readerWait加上当前读进程的个数,表示当前写进程需要等待readerCount个读线程释放锁
if r != 0 && atomic.AddInt32(&rw.readerWait, r) != 0 {
 //如果有其他读进程或者需要等待读进程释放个数>0(即readerCount>0),则进入写信号量等待队列
   runtime_SemacquireMutex(&rw.writerSem, false, 0)
}
  • 写写互斥
//通过互斥锁来实现 
rw.w.Lock()

Atomic

对变量进行并发安全的修改除了使用 Mutex 等锁机制外,sync/atomic 包的原子操作也可以实现。

atomic 包提供了一系列的原子操作,这些原子操作是在硬件层次,通过 CPU 指令实现的。sync/atomic中有一个重要的操作CompareAndSwap。通过该方法可以实现自旋锁。

atomic 包提供的原子操作主要包括 Add、CompareAndSwap、Load、Store、Swap。

// atomic 的 Add 是针对 int 和 uint 进行原子加操作。
func AddInt32(addr *int32, delta int32) (new int32)
func AddUint32(addr *uint32, delta uint32) (new uint32)
func AddInt64(addr *int64, delta int64) (new int64)
func AddUint64(addr *uint64, delta uint64) (new uint64)
func AddUintptr(addr *uintptr, delta uintptr) (new uintptr)

// 比较并交换方法实现了类似乐观锁的功能。
func CompareAndSwapInt32(addr *int32, old, new int32) (swapped bool)
func CompareAndSwapInt64(addr *int64, old, new int64) (swapped bool)
func CompareAndSwapUint32(addr *uint32, old, new uint32) (swapped bool)
func CompareAndSwapUint64(addr *uint64, old, new uint64) (swapped bool)
func CompareAndSwapUintptr(addr *uintptr, old, new uintptr) (swapped bool)
func CompareAndSwapPointer(addr *unsafe.Pointer, old, new unsafe.Pointer) (swapped bool)

// Load 方法可以避免读取过程中其他协程的修改动作。
func LoadInt32(addr *int32) (val int32)
func LoadInt64(addr *int64) (val int64)
func LoadUint32(addr *uint32) (val uint32)
func LoadUint64(addr *uint64) (val uint64)
func LoadUintptr(addr *uintptr) (val uintptr)
func LoadPointer(addr *unsafe.Pointer) (val unsafe.Pointer)

// Sotre 方法通过 unsafe.Pointer 指针实现原子性的修改。
func StoreInt32(addr *int32, val int32)
func StoreInt64(addr *int64, val int64)
func StoreUint32(addr *uint32, val uint32)
func StoreUint64(addr *uint64, val uint64)
func StoreUintptr(addr *uintptr, val uintptr)
func StorePointer(addr *unsafe.Pointer, val unsafe.Pointer)

// Swap 方法提供了值原子交换的能力,可以交换基本类型包括指针。
func SwapInt32(addr *int32, new int32) (old int32)
func SwapInt64(addr *int64, new int64) (old int64)
func SwapUint32(addr *uint32, new uint32) (old uint32)
func SwapUint64(addr *uint64, new uint64) (old uint64)
func SwapUintptr(addr *uintptr, new uintptr) (old uintptr)
func SwapPointer(addr *unsafe.Pointer, new unsafe.Pointer) (old unsafe.Pointer)