concurrency in go 读书笔记

《concurrency in go》这本书出版于 2017 年八月,里面有些观点还是蛮新颖的,烂大街的我就先不写了,重点写写书里提到的,我之前忽视的观点,以及一些奇技淫巧。

锁的粒度太大的话,有可能造成其它的 goroutine 饥饿

package main

import (
	"fmt"
	"sync"
	"time"
)

func main() {
	var wg sync.WaitGroup
	var sharedLock sync.Mutex
	const runtime = 1 * time.Second

	greedyWorker := func() {
		defer wg.Done()

		var count int
		for begin := time.Now(); time.Since(begin) <= runtime; {
			sharedLock.Lock()
			time.Sleep(3 * time.Nanosecond)
			sharedLock.Unlock()
			count++
		}

		fmt.Printf("Greedy worker was able to execute %v work loops\n", count)
	}

	politeWorker := func() {
		defer wg.Done()

		var count int
		for begin := time.Now(); time.Since(begin) <= runtime; {
			sharedLock.Lock()
			time.Sleep(1 * time.Nanosecond)
			sharedLock.Unlock()

			sharedLock.Lock()
			time.Sleep(1 * time.Nanosecond)
			sharedLock.Unlock()

			sharedLock.Lock()
			time.Sleep(1 * time.Nanosecond)
			sharedLock.Unlock()

			count++
		}

		fmt.Printf("Polite worker was able to execute %v work loops.\n", count)
	}

	wg.Add(2)
	go greedyWorker()
	go politeWorker()

	wg.Wait()
}

在我的 Mac 上跑的结果:

Greedy worker was able to execute 507672 work loops
Polite worker was able to execute 297669 work loops.

锁的粒度大,会得到更多的运行时间,而导致其它 goroutine 获得的执行机会变少。

close 各种状态的 channel 的结果对照表:

close_chan

这里面 close receive only 的 channel 发现和我的直觉有所不同。。会直接编译错误。

利用作用域来约束 channel 的逻辑

channel 一般都有发送端和接收端,所以声明、定义的时候一般是这么写的:

var a = make(chan int, 10)

如果我们的消费者和生产者代码紧跟着上面的代码的话,那么可能就写成这样:

var a = make(chan int, 10)

go func() {
    for {
        select {
            case <- a:
                // do some thing
        }
    }
}()

// producer...

当然,非得这么写也不是说就一定会有问题。但随着项目升级,代码被改得面目全非的时候,保不准会有人在 consumer 里向全局的 channel 里塞数据,从而造成错误的结果。我们还可以有更好的写法来规避这个问题:

consumer := func(ch <-chan int) {
    for {
        select {
            case <- ch:
                // do some thing
        }
    }
}

var a = make(chan int, 10)

go consumer(a)

将 channel 的定义放在 consumer 定义的后面,从作用域上彻底根绝了在 consumer 里直接操作 a channel 的可能性。

通过 done channel 避免 goroutine 泄露

package main

import (
	"fmt"
	"time"
)

func main() {
	doWork := func(
		done <-chan interface{},
		strings <-chan string,
	) <-chan interface{} { // <1>
		terminated := make(chan interface{})
		go func() {
			defer fmt.Println("doWork exited.")
			defer close(terminated)
			for {
				select {
				case s := <-strings:
					// Do something interesting
					fmt.Println(s)
				case <-done: // <2>
					return
				}
			}
		}()
		return terminated
	}

	done := make(chan interface{})
	terminated := doWork(done, nil)

	go func() { // <3>
		// Cancel the operation after 1 second.
		time.Sleep(1 * time.Second)
		fmt.Println("Canceling doWork goroutine...")
		close(done)
	}()

	<-terminated // <4>
	fmt.Println("Done.")
}

这个其实平常用的已经挺多的了。。

多个 ch 中有一个返回结果即退出的 or channel

package main

// if one of the channel is ready
// then the or channel will be ready

import (
	"fmt"
	"time"
)

func main() {
	var or func(chs ...chan interface{}) chan interface{}
	or = func(chs ...chan interface{}) chan interface{} {
		if len(chs) == 0 {
			return nil
		}

		if len(chs) == 1 {
			return chs[0]
		}

		orDone := make(chan interface{})

		go func() {
			defer close(orDone)
			switch len(chs) {
			case 2:
				select {
				case <-chs[0]:
				case <-chs[1]:
				}
			default: // len(chs) > 2
				select {
				case <-chs[0]:
				case <-chs[1]:
				case <-chs[2]:
				case <-or(append(chs[3:], orDone)...):
				}
			}
		}()

		return orDone
	}

	start := time.Now()

	sig := func(after time.Duration) chan interface{} {
		ch := make(chan interface{})

		go func() {
			defer close(ch)
			time.Sleep(after)
		}()

		return ch
	}
	<-or(
		sig(time.Second),
		sig(time.Second*2),
		sig(time.Minute),
	)
	fmt.Println(time.Since(start))

}

作者声称是在多个数据源,有一个返回时即可返回给用户的场景下,这种模式比较好使。不过我好像还没碰到过这种场景 orz。

用 fan-in fan-out 加快整个 pipeline 流程

这个实在没啥好说的,channel 然后又 channel 的场景,哪一步慢了,就在那里做并行。

用 bridge channel 简化 channel in channel 的消费代码

package main

import (
	"fmt"
)

func main() {
	orDone := func(done, c <-chan interface{}) <-chan interface{} {
		valStream := make(chan interface{})
		go func() {
			defer close(valStream)
			for {
				select {
				case <-done:
					return
				case v, ok := <-c:
					if ok == false {
						return
					}
					select {
					case valStream <- v:
					case <-done:
					}
				}
			}
		}()
		return valStream
	}
	bridge := func(
		done <-chan interface{},
		chanStream <-chan <-chan interface{},
	) <-chan interface{} {
		valStream := make(chan interface{}) // <1>
		go func() {
			defer close(valStream)
			for { // <2>
				var stream <-chan interface{}
				select {
				case maybeStream, ok := <-chanStream:
					if ok == false {
						return
					}
					stream = maybeStream
				case <-done:
					return
				}
				for val := range orDone(done, stream) { // <3>
					select {
					case valStream <- val:
					case <-done:
					}
				}
			}
		}()
		return valStream
	}
	genVals := func() <-chan <-chan interface{} {
		chanStream := make(chan (<-chan interface{}))
		go func() {
			defer close(chanStream)
			for i := 0; i < 10; i++ {
				stream := make(chan interface{}, 1)
				stream <- i
				close(stream)
				chanStream <- stream
			}
		}()
		return chanStream
	}

	for v := range bridge(nil, genVals()) {
		fmt.Printf("%v ", v)
	}
}

也是作者声称有些场景下需要在 channel 中套 channel,来保证一定的“发送顺序”,不过原谅我又没见过相关的场景。。。

context 使用

嗯,大致意思就是 context 是不可变的,可以用 WithCancel 和 WithTimeout 得到一个 cancel 方法,然后直接调用来取消整个 call graph,不过在 Go 中,如果在子 goroutine 中要支持取消的话,始终需要写:

select {
case <-ctx.Done():
case ...
}

而且理论上要被取消时尽快返回,所以对下游的代码侵入非常大,不知道其它语言是怎么解决的,抽时间调研一下~

书里的代码我 clone 了一份,放在 github 上,如果你觉得有意思的话,推荐也购(下)买(载)一本来看。

https://github.com/cch123/concurrency-in-go-src

Xargin

Xargin

If you don't keep moving, you'll quickly fall behind
Beijing