《concurrency in go》这本书出版于 2017 年八月,里面有些观点还是蛮新颖的,烂大街的我就先不写了,重点写写书里提到的,我之前忽视的观点,以及一些奇技淫巧。
锁的粒度太大的话,有可能造成其它的 goroutine 饥饿
package main
import (
"fmt"
"sync"
"time"
)
func main() {
var wg sync.WaitGroup
var sharedLock sync.Mutex
const runtime = 1 * time.Second
greedyWorker := func() {
defer wg.Done()
var count int
for begin := time.Now(); time.Since(begin) <= runtime; {
sharedLock.Lock()
time.Sleep(3 * time.Nanosecond)
sharedLock.Unlock()
count++
}
fmt.Printf("Greedy worker was able to execute %v work loops\n", count)
}
politeWorker := func() {
defer wg.Done()
var count int
for begin := time.Now(); time.Since(begin) <= runtime; {
sharedLock.Lock()
time.Sleep(1 * time.Nanosecond)
sharedLock.Unlock()
sharedLock.Lock()
time.Sleep(1 * time.Nanosecond)
sharedLock.Unlock()
sharedLock.Lock()
time.Sleep(1 * time.Nanosecond)
sharedLock.Unlock()
count++
}
fmt.Printf("Polite worker was able to execute %v work loops.\n", count)
}
wg.Add(2)
go greedyWorker()
go politeWorker()
wg.Wait()
}
在我的 Mac 上跑的结果:
Greedy worker was able to execute 507672 work loops
Polite worker was able to execute 297669 work loops.
锁的粒度大,会得到更多的运行时间,而导致其它 goroutine 获得的执行机会变少。
close 各种状态的 channel 的结果对照表:
这里面 close receive only 的 channel 发现和我的直觉有所不同。。会直接编译错误。
利用作用域来约束 channel 的逻辑
channel 一般都有发送端和接收端,所以声明、定义的时候一般是这么写的:
var a = make(chan int, 10)
如果我们的消费者和生产者代码紧跟着上面的代码的话,那么可能就写成这样:
var a = make(chan int, 10)
go func() {
for {
select {
case <- a:
// do some thing
}
}
}()
// producer...
当然,非得这么写也不是说就一定会有问题。但随着项目升级,代码被改得面目全非的时候,保不准会有人在 consumer 里向全局的 channel 里塞数据,从而造成错误的结果。我们还可以有更好的写法来规避这个问题:
consumer := func(ch <-chan int) {
for {
select {
case <- ch:
// do some thing
}
}
}
var a = make(chan int, 10)
go consumer(a)
将 channel 的定义放在 consumer 定义的后面,从作用域上彻底根绝了在 consumer 里直接操作 a channel 的可能性。
通过 done channel 避免 goroutine 泄露
package main
import (
"fmt"
"time"
)
func main() {
doWork := func(
done <-chan interface{},
strings <-chan string,
) <-chan interface{} { // <1>
terminated := make(chan interface{})
go func() {
defer fmt.Println("doWork exited.")
defer close(terminated)
for {
select {
case s := <-strings:
// Do something interesting
fmt.Println(s)
case <-done: // <2>
return
}
}
}()
return terminated
}
done := make(chan interface{})
terminated := doWork(done, nil)
go func() { // <3>
// Cancel the operation after 1 second.
time.Sleep(1 * time.Second)
fmt.Println("Canceling doWork goroutine...")
close(done)
}()
<-terminated // <4>
fmt.Println("Done.")
}
这个其实平常用的已经挺多的了。。
多个 ch 中有一个返回结果即退出的 or channel
package main
// if one of the channel is ready
// then the or channel will be ready
import (
"fmt"
"time"
)
func main() {
var or func(chs ...chan interface{}) chan interface{}
or = func(chs ...chan interface{}) chan interface{} {
if len(chs) == 0 {
return nil
}
if len(chs) == 1 {
return chs[0]
}
orDone := make(chan interface{})
go func() {
defer close(orDone)
switch len(chs) {
case 2:
select {
case <-chs[0]:
case <-chs[1]:
}
default: // len(chs) > 2
select {
case <-chs[0]:
case <-chs[1]:
case <-chs[2]:
case <-or(append(chs[3:], orDone)...):
}
}
}()
return orDone
}
start := time.Now()
sig := func(after time.Duration) chan interface{} {
ch := make(chan interface{})
go func() {
defer close(ch)
time.Sleep(after)
}()
return ch
}
<-or(
sig(time.Second),
sig(time.Second*2),
sig(time.Minute),
)
fmt.Println(time.Since(start))
}
作者声称是在多个数据源,有一个返回时即可返回给用户的场景下,这种模式比较好使。不过我好像还没碰到过这种场景 orz。
用 fan-in fan-out 加快整个 pipeline 流程
这个实在没啥好说的,channel 然后又 channel 的场景,哪一步慢了,就在那里做并行。
用 bridge channel 简化 channel in channel 的消费代码
package main
import (
"fmt"
)
func main() {
orDone := func(done, c <-chan interface{}) <-chan interface{} {
valStream := make(chan interface{})
go func() {
defer close(valStream)
for {
select {
case <-done:
return
case v, ok := <-c:
if ok == false {
return
}
select {
case valStream <- v:
case <-done:
}
}
}
}()
return valStream
}
bridge := func(
done <-chan interface{},
chanStream <-chan <-chan interface{},
) <-chan interface{} {
valStream := make(chan interface{}) // <1>
go func() {
defer close(valStream)
for { // <2>
var stream <-chan interface{}
select {
case maybeStream, ok := <-chanStream:
if ok == false {
return
}
stream = maybeStream
case <-done:
return
}
for val := range orDone(done, stream) { // <3>
select {
case valStream <- val:
case <-done:
}
}
}
}()
return valStream
}
genVals := func() <-chan <-chan interface{} {
chanStream := make(chan (<-chan interface{}))
go func() {
defer close(chanStream)
for i := 0; i < 10; i++ {
stream := make(chan interface{}, 1)
stream <- i
close(stream)
chanStream <- stream
}
}()
return chanStream
}
for v := range bridge(nil, genVals()) {
fmt.Printf("%v ", v)
}
}
也是作者声称有些场景下需要在 channel 中套 channel,来保证一定的“发送顺序”,不过原谅我又没见过相关的场景。。。
context 使用
嗯,大致意思就是 context 是不可变的,可以用 WithCancel 和 WithTimeout 得到一个 cancel 方法,然后直接调用来取消整个 call graph,不过在 Go 中,如果在子 goroutine 中要支持取消的话,始终需要写:
select {
case <-ctx.Done():
case ...
}
而且理论上要被取消时尽快返回,所以对下游的代码侵入非常大,不知道其它语言是怎么解决的,抽时间调研一下~
书里的代码我 clone 了一份,放在 github 上,如果你觉得有意思的话,推荐也购(下)买(载)一本来看。