Go 系列文章 10: sync

原文和后续更新:

https://github.com/cch123/golang-notes/blob/master/sync.md

线性一致性模型

从原理上来讲,atomic 操作和非 atomic 操作之间不满足线性一致性模型。这和现代计算机的 CPU 乱序执行,以及 compiler 为优化而进行的指令重排有关。在 C++ 中针对各种场景和性能需求提供了各种 memory order 选项:

  1. memory_order_relaxed Relaxed operation: 只保证当前操作的原子性,不保证其它读写的顺序,也不进行任何多余的同步。就是说 CPU 和编译器可以任意重排其它指令。
  2. memory_order_consume 和 load 搭配使用时,相当于执行了一个 consume 操作,当前线程依赖 loaded 的值的读写都不能被 reorder 到 load 操作之前。其它线程中对依赖的变量的写操作如果 release 了同一个 atomic 变量,在当前线程中马上可见。
  3. memory_order_acquire 只能用在 load 操作中,当前线程中在该 load 之后发生的所有读写,都不能被 reorder 到 load 之前。其它线程中所有写入操作,如果对该 atomic 变量执行了 release 操作,那么其之前的所有写操作在当前线程都看得到。
  4. memory_order_release 只能用在 store 操作中,当前线程中发生在 store 之前的所有读写都不能被 reorder 到 store 操作之后。当前线程在 store 之前发生的所有写操作在其它线程执行同一个 atomic 变量的获取操作之后便都是可见的了。所有对原子变量的写入都会在 consume 相同原子变量的线程中可见。
  5. memory_order_acq_rel 提供给 read-modify-write 操作用。这种操作会既执行 acquire 又执行 release。当前线程中的读写不能被 reorder 到该操作之前或之后。其它线程中对同一 atomic 变量执行 rlease 操作的写在当前线程中执行 rmw 之前都可见,并且 rmw 操作结果对其它 acquire 相同 atomic 变量的线程也是可见的。
  6. memory_order_seq_cst 可以在 load、store 和 rmw 操作中使用。load 操作使用时,相当于执行了 acquire,store 相当于执行了 release,rmw 相当于执行了 acquire 和 release。所有线程间观察到的修改顺序都是一致的。

这里面时序最为严格的是 memory_order_seq_cst,这就是我们常说的“线性一致性”。Go 语言的 atomic 类似这个最严格的时序。简单说明即:

  1. 当一个 goroutine 中对某个值进行 atomic.Store,在另一个 goroutine 中对同一个变量进行 atomic.Load,那么 Load 之后可以看到 Store 的结果,且可以看到 Store 之前的其它内存写入操作(在 C++ 的文档中可能被称为 side effect)。
  2. atomic.Store 全局有序,即你在任何一个 goroutine 中观察到的全局 atomic 变量们的变化顺序一定是一致的,不会出现有违逻辑顺序的出现次序。这个有一些难理解,看一下下面这个 C++ 的例子:
#include <thread>
#include <atomic>
#include <cassert>
 
std::atomic<bool> x = {false};
std::atomic<bool> y = {false};
std::atomic<int> z = {0};
 
void write_x()
{
    x.store(true, std::memory_order_seq_cst);
}
 
void write_y()
{
    y.store(true, std::memory_order_seq_cst);
}
 
void read_x_then_y()
{
    while (!x.load(std::memory_order_seq_cst))
        ;
    if (y.load(std::memory_order_seq_cst)) {
        ++z;
    }
}
 
void read_y_then_x()
{
    while (!y.load(std::memory_order_seq_cst))
        ;
    if (x.load(std::memory_order_seq_cst)) {
        ++z;
    }
}
 
int main()
{
    std::thread a(write_x);
    std::thread b(write_y);
    std::thread c(read_x_then_y);
    std::thread d(read_y_then_x);
    a.join(); b.join(); c.join(); d.join();
    assert(z.load() != 0);  // will never happen
}

在非线性一致的场景下,可能会出现线程 c 和线程 d 观察到的 x,y 值分别为 c: true, false; d: false, true。从而导致最终 z 的结果为 0。

而线性一致的场景下,我们可以用全局事件发生的顺序来推断最终的内存状态。但因为这是最严格的时序,所以 compiler 和硬件同步的成本较高。如果我们的 atomic 变量只用来做全局的简单计数,比如 counter,那么在 Go 中就一定会比 C++ 一类提供了 memory order 选项的语言消耗更多的成本。

但如果 atomic.Load 和 atomic.Store 提供像 C++ 一样的 memory_order 选项,那么又会带给程序员一定的心智负担,所以看起来 Go 官方并不打算提供这样的选项。

atomic 的实现

TEXT ·AddUint32(SB),NOSPLIT,$0-20
    MOVQ    addr+0(FP), BP
    MOVL    delta+8(FP), AX
    MOVL    AX, CX
    LOCK
    XADDL   AX, 0(BP)
    ADDL    AX, CX
    MOVL    CX, new+16(FP)
    RET
0x0036 00054 (atomic.go:10)    MOVL    $10, CX
0x003b 00059 (atomic.go:10)    LOCK
0x003c 00060 (atomic.go:10)    XADDL    CX, (AX)

在 intel 平台上被翻译为:

mov ecx, 0xa
lock xadd DWORD PTR [rax], ecx

lock 指令前缀可以使许多指令操作(ADD, ADC, AND, BTC, BTR, BTS, CMPXCHG, CMPXCH8B, DEC, INC, NEG, NOT, OR, SBB, SUB, XOR, XADD, and XCHG)变成原子操作。CMPXCHG 指令用来实现 CAS 操作。

atomic.CompareAndSwap 即是使用 lock cmpxchg 来实现的。

在使用 lock 指令时,会导致 CPU 锁总线。

waitgroup

// 在主 goroutine 中 Add 和 Wait,在其它 goroutine 中 Done
// 在第一次使用之后,不能对 WaitGroup 再进行拷贝
type WaitGroup struct {
    noCopy noCopy

    // state1 的高 32 位是计数器,低 32 位是 waiter 计数
    // 64 位的 atomic 操作需要按 64 位对齐,但是 32 位编译器没法保证这种对齐
    // 所以分配 12 个字节(多分配了 4 个字节)
    // 当 state 没有按 8 对齐时,我们可以偏 4 个字节来使用
    // 按 8 对齐时:
    // 0000...0000      0000...0000       0000...0000
    // |- 4 bytes-|    |- 4 bytes -|     |- 4 bytes -|
    //     使用              使用             不使用
    // 没有按 8 对齐时:
    // |- 4 bytes-|    |- 4 bytes -|     |- 4 bytes -|
    //    不使用              使用             使用
    // |-low->  ---------> ------> -----------> high-|
    state1 [12]byte
    sema   uint32
}

func (wg *WaitGroup) state() *uint64 {
    // 判断 state 是否按照 8 字节对齐
    if uintptr(unsafe.Pointer(&wg.state1))%8 == 0 {
        // 已对齐时,使用低 8 字节即可
        return (*uint64)(unsafe.Pointer(&wg.state1))
    } else {
        // 未对齐时,使用高 8 字节
        return (*uint64)(unsafe.Pointer(&wg.state1))
        return (*uint64)(unsafe.Pointer(&wg.state1[4]))
    }
}

// Add 一个 delta,delta 可能是负值,在 WaitGroup 的 counter 上增加该值
// 如果 counter 变成 0,所有阻塞在 Wait 函数上的 goroutine 都会被释放
// 如果 counter 变成了负数,Add 会直接 panic
// 当 counter 是 0 且 Add 的 delta 为正的操作必须发生在 Wait 调用之前。
// 而当 counter > 0 且 Add 的 delta 为负的操作则可以发生在任意时刻。
// 一般来讲,Add 操作应该在创建 goroutine 或者其它需要等待的事件发生之前调用
// 如果 wg 被用来等待几组独立的事件集合
// 新的 Add 调用应该在所有 Wait 调用返回之后再调用
// 参见 wg 的 example
func (wg *WaitGroup) Add(delta int) {
    statep := wg.state()

    state := atomic.AddUint64(statep, uint64(delta)<<32)
    v := int32(state >> 32) // counter 高位 4 字节
    w := uint32(state) // waiter counter,截断,取低位 4 个字节

    if v < 0 {
        panic("sync: negative WaitGroup counter")
    }
    if w != 0 && delta > 0 && v == int32(delta) {
        panic("sync: WaitGroup misuse: Add called concurrently with Wait")
    }
    if v > 0 || w == 0 {
        return
    }

    // 当前 goroutine 已经把 counter 设为 0,且 waiter 数 > 0
    // 这时候不能有状态的跳变
    // - Add 不能和 Wait 进行并发调用
    // - Wait 如果发现 counter 已经等于 0,则不应该对 waiter 数加一了
    // 这里是对 wg 误用的简单检测
    if *statep != state {
        panic("sync: WaitGroup misuse: Add called concurrently with Wait")
    }

    // 重置 waiter 计数为 0
    *statep = 0
    for ; w != 0; w-- {
        runtime_Semrelease(&wg.sema, false)
    }
}

// Done 其实就是 wg 的 counter - 1
// 进入 Add 函数后
// 如果 counter 变为 0 会触发 runtime_Semrelease 通知所有阻塞在 Wait 上的 g
func (wg *WaitGroup) Done() {
    wg.Add(-1)
}

// Wait 会阻塞直到 wg 的 counter 变为 0
func (wg *WaitGroup) Wait() {
    statep := wg.state()

    for {
        state := atomic.LoadUint64(statep)
        v := int32(state >> 32) // counter
        w := uint32(state) // waiter count
        if v == 0 { // counter
            return
        }

        // 如果没成功,可能有并发,循环再来一次相同流程
        // 成功直接返回
        if atomic.CompareAndSwapUint64(statep, state, state+1) {
            runtime_Semacquire(&wg.sema) // 和上面的 Add 里的 runtime_Semrelease 是对应的
            if *statep != 0 {
                panic("sync: WaitGroup is reused before previous Wait has returned")
            }
            return
        }
    }
}

once

// 内含一个锁和用来做原子操作的变量
type Once struct {
    m    Mutex
    done uint32
}

// Do 被用来执行那些只能执行一次的初始化操作
//     config.once.Do(func() { config.init(filename) })
//
// 因为对 Do 的调用直到其中的一个 f 执行之后才会返回,所以
// f 中不能调用同一个 once 实例的 Do 函数,否则会死锁
// 如果 f 内 panic 了,Do 也认为已经返回了,未来对 Do 的调用不会再执行 f

// once.Do(f) 被调用多次时,只有第一次调用会真正的执行 f
// 对于每一个要执行的 f,都需要一个对应的 once 实例
// 在 done 已经被改成 1 之后
// 所有进入函数调用的行为会用 atomic 读取值之后直接返回
func (o *Once) Do(f func()) {
    // 轻量级的原子变量 load
    if atomic.LoadUint32(&o.done) == 1 {
        // 如果原子 load 后发现已经是 1 了,直接返回
        return
    }

    // Slow-path.
    o.m.Lock()
    defer o.m.Unlock()
    // 在 atomic load 的时候为 0,不代表进入 lock 之后也是 0
    // 所以还需要再判断一次
    // 临界区内的判断和修改是比较稳妥的
    if o.done == 0 {
        defer atomic.StoreUint32(&o.done, 1)
        f()
    }
}

once.Do 实际上是一种优化,只要过程被执行过了,那么之后所有判断都走 atomic,不用进入临界区。

Mutex

// Mutex 是互斥锁
// 其零值是一个 unlocked 的互斥量
// 在被首次使用之后,Mutex 就不应该发生拷贝动作了
type Mutex struct {
    state int32
    sema  uint32
}

加锁过程:

// 对 m 上锁
// 如果锁已经在使用中
// 调用 Lock 的 goroutine 会陷入阻塞
// 直到 mutex 变为可用
func (m *Mutex) Lock() {
    // 当前直接就是已解锁的 mutex
    // 直接用 atomic cas,更快
    if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
        return
    }

    var waitStartTime int64
    starving := false
    awoke := false
    iter := 0
    old := m.state
    for {
        // 这里 if 有 starvation 的判断
        // 在饥饿模式时不能自旋,因为所有权被移交给等待的 goroutine 了
        // 所以我们没办法获得 mutex
        if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) {
            // 这里会做积极的自旋
            // 没有其它饥饿的 goroutine 的话,我们尽量直接就设置 mutexWoken flag
            // 这样在 Unlock 的时候就不用唤醒其它被阻塞的 goroutine 了
            if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
                // 设置 mutexWoken flag
                atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
                awoke = true
            }
            // 进 runtime 自旋
            runtime_doSpin()
            iter++
            old = m.state
            continue
        }
        new := old

        // 如果 mutex 处于 starving 状态,就不应该武断地抢锁了
        // 新来的 goroutine 应该先去排队
        if old&mutexStarving == 0 {
            // 说明老状态里没有 starving 那一位
            // 即说明原来的 mutex 不是 starvation 状态
            // 给新的 state 标上 locked 这位
            new |= mutexLocked
        }

        if old&(mutexLocked|mutexStarving) != 0 {
            new += 1 << mutexWaiterShift
        }

        // 当前 goroutine 将 mutex 切换到 starvation 模式
        // 如果 mutex 当前已经被 unlock 了,就不要做这个切换了
        // Unlock 的时候会认为一个 starving 的 mutex 一定会有等待的 goroutine,
        // 这种情况下一定为 true
        if starving && old&mutexLocked != 0 {
            new |= mutexStarving
        }

        if awoke {
            // 当前 goroutine 是处于 awoke 状态
            // 但是从 mutex 里拿到的状态并没有 mutexWoken 这个 flag
            // 说明这里发生了 bug
            // PS: 这种情况下应该是没有 waiters 的
            // PS: 而是当前的加锁的新 goroutine 直接进入唤醒流程
            if new&mutexWoken == 0 {
                throw("sync: inconsistent mutex state")
            }

            // goroutine 被从 sleep 唤醒
            // 所以我们需要在两种情况(starving  和非 starving 的)下
            // :都 reset 掉这个 flag
            new &^= mutexWoken
        }

        if atomic.CompareAndSwapInt32(&m.state, old, new) {
            if old&(mutexLocked|mutexStarving) == 0 {
                break // locked the mutex with CAS
            }

            // 如果之前已经等待过了,那么直接插到队列最前面
            queueLifo := waitStartTime != 0
            if waitStartTime == 0 {
                waitStartTime = runtime_nanotime()
            }
            runtime_SemacquireMutex(&m.sema, queueLifo)
            // 如果等待时间超过了阈值,那么就进入 starving 状态
            starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs
            old = m.state
            if old&mutexStarving != 0 {
                // 如果当前 goroutine 被唤醒,且 mutex 处于 starvation 状态
                // 这时候控制权被移交到给了我们,但 mutex 不知道怎么回事处于不一致的状态:
                // mutexLocked 标识位还没有设置,但我们却仍然认为当前 goroutine 正在等待这个 mutex。说明是个 bug,需要修正
                if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 {
                    throw("sync: inconsistent mutex state")
                }
                delta := int32(mutexLocked - 1<<mutexWaiterShift)
                if !starving || old>>mutexWaiterShift == 1 {
                    // 退出饥饿模式
                    // 必须要在这里退出,且考虑等待时间
                    // 饥饿模式很低效,一旦两个 goroutine 同时将 mutex 切换到饥饿模式
                    // 可能会彼此无限地锁下去
                    // ??
                    delta -= mutexStarving
                }
                atomic.AddInt32(&m.state, delta)
                break
            }
            awoke = true
            iter = 0
        } else {
            old = m.state
        }
    }
}

解锁过程:

// 解锁 m,对未加锁的 mutex 解锁会引起错误
// 被加锁的 mutex 并不是和具体的某个 goroutine 绑定的
// 完全可以在一个 goroutine 中加锁并在另外的 goroutine 中解锁
func (m *Mutex) Unlock() {

    // 干掉 mutexLocked 的标识位
    new := atomic.AddInt32(&m.state, -mutexLocked)
    if (new+mutexLocked)&mutexLocked == 0 {
        throw("sync: unlock of unlocked mutex")
    }

    // 如果新的状态表示 mutex 之前没有处于饥饿状态
    if new&mutexStarving == 0 {
        old := new
        for {
            // 如果当前没有处于饥饿模式等待中的 goroutine,或者当前这个 goroutine 已经
            // 被唤醒或抢到了锁,没有必要再唤醒其它 goroutine 了
            // 饥饿模式中,管理权会直接会被直接从 unlocking goroutine 移交给下一个 waiter
            // 当前 goroutine 并不在这个链条中,
            // 因为我们在 unlock 上面的 mutex 时,没有观察到 mutexStarving 的标识位
            // 所以直接 return 让路
            if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 {
                return
            }

            // 获取唤醒其它人的权力
            new = (old - 1<<mutexWaiterShift) | mutexWoken
            if atomic.CompareAndSwapInt32(&m.state, old, new) {
                runtime_Semrelease(&m.sema, false)
                return
            }
            old = m.state
        }
    } else {
        // 饥饿模式: 将 mutex 的所有权移交给下一个 waiter
        // 注意: mutexLocked 没有设置,waiter 被唤醒后会设置这个标识
        // 但是 mutex 在 waiter 被唤醒后,如果 mutexStarving 位是 1 的话
        // 仍然会被认为是上锁的,所以新来的 goroutine 是没法获取这个锁的
        runtime_Semrelease(&m.sema, true)
    }
}

sync.RWMutex

reader 加解锁过程:

// RWMutex 是 reader/writer 互斥锁
// 这种锁可以被任意数量的 reader 或者单独的一个 writer 所持有
// 其零值是遇 unlocked mutex
//
// RWMutex 在首次使用后就不应该被拷贝了
//
// 如果一个 goroutine 持有了 RWMutex 用来做读操作
// 这时候另一个 goroutine 可能会调用 Lock
// 在这之后,就不会有任何 goroutine 会获得 read lock 了
// 直到最初的 read lock 被释放。
// 需要注意,这种锁是禁止递归的 read locking 的。
// 这是为了保证锁最终一定能够到达可用状态;
// 一个阻塞的 Lock 的调用会排它地阻止其它 readers 获取到这个锁
type RWMutex struct {
    w           Mutex  // held if there are pending writers
    writerSem   uint32 // semaphore for writers to wait for completing readers
    readerSem   uint32 // semaphore for readers to wait for completing writers
    readerCount int32  // number of pending readers
    readerWait  int32  // number of departing readers
}

const rwmutexMaxReaders = 1 << 30

// RLock 锁住 rw 来进行读操作
//
// 不能被使用来做递归的 read locking; 一个阻塞的 Lock 调用会阻止其它新 readers 获取当前锁
func (rw *RWMutex) RLock() {
    if atomic.AddInt32(&rw.readerCount, 1) < 0 {
        // 有 writer 挂起,等待其操作完毕。
        runtime_Semacquire(&rw.readerSem)
    }
}

// RUnlock 相当于 RLock 调用的逆向操作;
// 其不会影响到其它同时持锁的 reader 们
// 如果当前 rw 不是被锁住读的状态,那么就是一个 bug
func (rw *RWMutex) RUnlock() {
    if r := atomic.AddInt32(&rw.readerCount, -1); r < 0 {
        if r+1 == 0 || r+1 == -rwmutexMaxReaders {
            throw("sync: RUnlock of unlocked RWMutex")
        }
        // 有 writer 正在挂起
        if atomic.AddInt32(&rw.readerWait, -1) == 0 {
            // 最后一个 reader 负责 unblock writer
            runtime_Semrelease(&rw.writerSem, false)
        }
    }
}

writer 加解锁过程:

// Lock 对 rw 加写锁
// 如果当前锁已经被锁住进行读或者进行写
// Lock 会阻塞,直到锁可用
func (rw *RWMutex) Lock() {
    // First, resolve competition with other writers.
    // 首先需要解决和其它 writer 进行的竞争,这里是去抢 RWMutex 中的 Mutex 锁
    rw.w.Lock()
    // 抢到了上面的锁之后,通知所有 reader,现在有一个挂起的 writer 等待写入了
    r := atomic.AddInt32(&rw.readerCount, -rwmutexMaxReaders) + rwmutexMaxReaders
    // 等待最后的 reader 将其唤醒
    if r != 0 && atomic.AddInt32(&rw.readerWait, r) != 0 {
        runtime_Semacquire(&rw.writerSem)
    }
}

// Unlock 将 rw 的读锁解锁。如果当前 rw 没有处于锁定读的状态,那么就是 bug
//
// 像 Mutex 一样,一个上锁的 RWMutex 并没有和特定的 goroutine 绑定。
// 可以由一个 goroutine Lock 它,并由其它的 goroutine 解锁
func (rw *RWMutex) Unlock() {

    // 告诉所有 reader 现在没有活跃的 writer 了
    r := atomic.AddInt32(&rw.readerCount, rwmutexMaxReaders)
    if r >= rwmutexMaxReaders {
        throw("sync: Unlock of unlocked RWMutex")
    }
    // Unblock 掉所有正在阻塞的 reader
    for i := 0; i < int(r); i++ {
        runtime_Semrelease(&rw.readerSem, false)
    }
    // 让其它的 writer 可以继续工作
    rw.w.Unlock()
}

参考资料

http://www.weixianmanbu.com/article/736.html

https://www.cnblogs.com/gaochundong/p/lock_free_programming.html

https://en.cppreference.com/w/cpp/atomic/memory_order

Go 系列文章 10: sync
Share this

家境贫寒,整理不易,客官如果觉得不错欢迎打赏!