一些连接池相关的总结

http 标准库

服务端

请求处理

package main

import (
	"io"
	"log"
	"net/http"
)

func sayhello(wr http.ResponseWriter, r *http.Request) {
	wr.Header()["Content-Type"] = []string{"application/json"}
	io.WriteString(wr, "hello")
}

func main() {
	http.HandleFunc("/", sayhello)
	http.ListenAndServe(":9090", nil)
}

1-1

每一个请求启动一个 goroutine,读取完毕之后,调用用户传入的 handler(没有的话就用默认的),在同一连接进行 response 响应。整体上是个 request/response loop 模型。

客户端

连接池

type Transport struct {
	idleMu       sync.Mutex
	closeIdle    bool                                // user has requested to close all idle conns
	idleConn     map[connectMethodKey][]*persistConn // most recently used at end
	idleConnWait map[connectMethodKey]wantConnQueue  // waiting getConns
	idleLRU      connLRU

	connsPerHostMu   sync.Mutex
	connsPerHost     map[connectMethodKey]int
	connsPerHostWait map[connectMethodKey]wantConnQueue // waiting getConns

    // MaxIdleConns controls the maximum number of idle (keep-alive)
	// connections across all hosts. Zero means no limit.
	MaxIdleConns int

	// MaxIdleConnsPerHost, if non-zero, controls the maximum idle
	// (keep-alive) connections to keep per-host. If zero,
	// DefaultMaxIdleConnsPerHost is used.
	MaxIdleConnsPerHost int

	// MaxConnsPerHost optionally limits the total number of
	// connections per host, including connections in the dialing,
	// active, and idle states. On limit violation, dials will block.
	//
	// Zero means no limit.
	MaxConnsPerHost int

	// IdleConnTimeout is the maximum amount of time an idle
	// (keep-alive) connection will remain idle before closing
	// itself.
	// Zero means no limit.
	IdleConnTimeout time.Duration
}

transport 和 client 是一一对应,每个 tranport 内有自己的 connpool, idleConn 的结构是:map[connectMethodKey][]*persistConn,这个 map 的 key 是个数据结构:

// connectMethodKey is the map key version of connectMethod, with a
// stringified proxy URL (or the empty string) instead of a pointer to
// a URL.
type connectMethodKey struct {
	proxy, scheme, addr string
	onlyH1              bool
}

proxy 地址 + 协议 + 地址,以及是否只支持 http1,构成该 map 的 key,proxy 地址是完整的 proxy 地址,比如 export HTTP_PROXY=localhost:1081,则该地址为用户提供的字符串。scheme 一般是 http:// 或 https:// 之类的字符串,addr 包含完整的域名(或 IP)和端口。

getConn:

2

在 http2 中,同一个连接可以被重复使用,所以 http2 的逻辑里,该连接被返回后仍然保持在连接池里。是否可以重复使用由 pconn.alt 来决定。

tryPutIdleConn

3

如果有正在等待连接的 goroutine,那么就把这条连接 deliver 给相应的 goroutine,这会触发相应的 ready 操作,使阻塞中的 goroutine 被唤醒继续处理请求。

否则将连接放回到 Transport 的 idleConn 和 idleLRU 中。

readloop 和 writeloop

func (t *Transport) dialConn(ctx context.Context, cm connectMethod) (pconn *persistConn, err error) {
	go pconn.readLoop()
	go pconn.writeLoop()
	return pconn, nil
}

所以每个 conn 都会有相应的 readloop 和 writeloop,因此每个连接至少有两个 goroutine。

用户协程在使用 http.Client 发送请求时,一路到 http.Transport.roundTrip -> http.persistConn.roundTrip:

	pc.writech <- writeRequest{req, writeErrCh, continueCh}

	resc := make(chan responseAndError)
	pc.reqch <- requestAndChan{
		req:        req.Request,
		ch:         resc,
		addedGzip:  requestedGzip,
		continueCh: continueCh,
		callerGone: gone,
	}

在该函数中,将 request 和接收请求的 ch 传入到 reqch,把 writeRequest 写入到 writech。

  • writeloop 从 writech 中收到了写请求,会把内容写入到 conn 上,这个请求也就发给 server 端了
  • readloop 收到 requestAndChan 结果,上面 writeloop 相当于已经把请求数据发送到 server 端,readloop 这时候可以从 conn 上读出 server 发回的 response 数据,所以 readloop 主要做的就是 ReadResponse,然后把 response 的内容写入到 requestAndChan.ch 中。
  • 主协程只要监听 requestAndChan.ch 来接收相应的 response 即可(用 select 同时监听 err、连接关闭等 chan)。

这里 http 标准库的做法要参考一下,把接收数据和相应的错误处理代码可以都集中在一起:

	for {
		testHookWaitResLoop()
		select {
		case err := <-writeErrCh: // 往 server 端写数据异常
			if debugRoundTrip {
				req.logf("writeErrCh resv: %T/%#v", err, err)
			}
			if err != nil {
				pc.close(fmt.Errorf("write error: %v", err))
				return nil, pc.mapRoundTripError(req, startBytesWritten, err)
			}
			if d := pc.t.ResponseHeaderTimeout; d > 0 {
				if debugRoundTrip {
					req.logf("starting timer for %v", d)
				}
				timer := time.NewTimer(d)
				defer timer.Stop() // prevent leaks
				respHeaderTimer = timer.C
			}
		case <-pc.closech: // 连接关闭异常
			if debugRoundTrip {
				req.logf("closech recv: %T %#v", pc.closed, pc.closed)
			}
			return nil, pc.mapRoundTripError(req, startBytesWritten, pc.closed)
		case <-respHeaderTimer: // 读请求头超时
			if debugRoundTrip {
				req.logf("timeout waiting for response headers.")
			}
			pc.close(errTimeout)
			return nil, errTimeout
		case re := <-resc: // 正常地从 response 的 channel 里读到了响应数据
			if (re.res == nil) == (re.err == nil) {
				panic(fmt.Sprintf("internal error: exactly one of res or err should be set; nil=%v", re.res == nil))
			}
			if debugRoundTrip {
				req.logf("resc recv: %p, %T/%#v", re.res, re.err, re.err)
			}
			if re.err != nil {
				return nil, pc.mapRoundTripError(req, startBytesWritten, re.err)
			}
			return re.res, nil
		case <-cancelChan: // 用户侧通过 context 取消了流程
			pc.t.CancelRequest(req.Request)
			cancelChan = nil
		case <-ctxDoneChan: // 这个应该意思差不多
			pc.t.cancelRequest(req.Request, req.Context().Err())
			cancelChan = nil
			ctxDoneChan = nil
		}
	}

http2

https://tools.ietf.org/html/rfc7540
https://github.com/bagder/http2-explained

4

http2 协议通过 frame 中的 stream id 对请求和响应进行关联。

http2 可以不等待上一个请求响应后再发下一个请求,因此同一个连接上可以实现 multiplexing。标准库中对于 http2 连接的处理复用了 http1 的连接池逻辑,只不过从连接池中取连接时,并没有真的从连接池里把这个连接拿走。获取到的连接依然保留在 connpool 中。

除此之外,h2 的 connpool 和 h1 的没什么区别。

从 idleConn 数组中获取 idle 连接时:

func (t *Transport) queueForIdleConn(w *wantConn) (delivered bool) {
			if delivered {
				if pconn.alt != nil {
					// HTTP/2: multiple clients can share pconn.
					// Leave it in the list.
				} else {
					// HTTP/1: only one client can use pconn.
					// Remove it from the list.
					t.idleLRU.remove(pconn)
					list = list[:len(list)-1]
				}
			}

把使用完的连接放回连接池时:

	// HTTP/2 (pconn.alt != nil) connections do not come out of the idle list,
	// because multiple goroutines can use them simultaneously.
	// If this is an HTTP/2 connection being “returned,” we're done.
	if pconn.alt != nil && t.idleLRU.m[pconn] != nil {
		return nil
	}

	if pconn.alt == nil {
			// HTTP/1.
			// Loop over the waiting list until we find a w that isn't done already, and hand it pconn.
			for q.len() > 0 {
				w := q.popFront()
				if w.tryDeliver(pconn, nil) {
					done = true
					break
				}
			}
		} else {
			// HTTP/2.
			// Can hand the same pconn to everyone in the waiting list,
			// and we still won't be done: we want to put it in the idle
			// list unconditionally, for any future clients too.
			for q.len() > 0 {
				w := q.popFront()
				w.tryDeliver(pconn, nil)
			}
		}
  • 如果 LRU 列表非空,说明当前没有等待的 goroutine,而在获取 http2 连接时,并没有把连接从连接池中真地拿走,所以直接返回就行了。
  • 如果 LRU 列表为空,这条可能是新建的连接,需要把 waitqueue 弹到空,并把当前这条连接放进连接池。

fasthttp

服务端

请求处理

5

fasthttp 的 server 端使用 worker pool 来进行 goroutine 复用,不会频繁创建新的 g。

workerPool.workerFunc 就是每个 worker 的主循环:

func (wp *workerPool) workerFunc(ch *workerChan) {
	var c net.Conn

	for c = range ch.ch {
		if c == nil {
			break
		}

		wp.WorkerFunc(c)
	}

	wp.lock.Lock()
	wp.workersCount--
	wp.lock.Unlock()
}

每次 serve 新的 conn 时:

  1. 从 workerpool 中获取一个 worker,没有就新建,启动 workerFunc 主循环,监听 worker channel。
  2. 把当前 serve 的新连接发送到 worker channel
  3. workerFunc 获取到新 conn,即开始请求处理流程。执行 fasthttp.Server.serveConn

客户端

连接池

type HostClient struct {
	// Maximum number of connections which may be established to all hosts
	// listed in Addr.
	//
	// You can change this value while the HostClient is being used
	// using HostClient.SetMaxConns(value)
	//
	// DefaultMaxConnsPerHost is used if not set.
	MaxConns int

	// Keep-alive connections are closed after this duration.
	//
	// By default connection duration is unlimited.
	MaxConnDuration time.Duration

	// Idle keep-alive connections are closed after this duration.
	//
	// By default idle connections are closed
	// after DefaultMaxIdleConnDuration.
	MaxIdleConnDuration time.Duration

	// Maximum number of attempts for idempotent calls
	//
	// DefaultMaxIdemponentCallAttempts is used if not set.
	MaxIdemponentCallAttempts int

    
    // Maximum duration for waiting for a free connection.
	//
	// By default will not waiting, return ErrNoFreeConns immediately
	MaxConnWaitTimeout time.Duration

	clientName  atomic.Value
	lastUseTime uint32

	connsLock  sync.Mutex
	connsCount int
	conns      []*clientConn
	connsWait  *wantConnQueue
}

acquireConn

6

流程比较简单,如果当前 client.conns 数组 > 0,说明有空闲连接,直接取最后一个元素就好,这个元素一般是最近放进去的连接。

releaseConn

func (c *HostClient) releaseConn(cc *clientConn) {
	cc.lastUseTime = time.Now()
	if c.MaxConnWaitTimeout <= 0 {
		c.connsLock.Lock()
		c.conns = append(c.conns, cc)
		c.connsLock.Unlock()
		return
	}

	// try to deliver an idle connection to a *wantConn
	c.connsLock.Lock()
	defer c.connsLock.Unlock()
	delivered := false
	if q := c.connsWait; q != nil && q.len() > 0 {
		for q.len() > 0 {
			w := q.popFront()
			if w.waiting() {
				delivered = w.tryDeliver(cc, nil)
				break
			}
		}
	}
	if !delivered {
		c.conns = append(c.conns, cc)
	}

releaseConn 会先尽量尝试把当前的连接给正在等待连接的请求(wantConn),弹出等待队列(connsWait)的第一个元素。并把连接转交给该请求。如果该请求的状态已经不是 waiting 了,则继续弹出,直到找到了合适的来接盘,或者等待队列弹空。

如果没有顺利地把连接交出去,把当前连接入空闲连接数组(c.conns)。

需要注意 fasthttp 里的 conns 是连接池,clientConnPool 是 clientConn 对象的对象池。

与标准库中的 client 不同的是,fasthttp 中没有 read write loop,所以每个请求是在当前协程中完成的:

  1. 把 request 的 header 和 body 写入到 conn
  2. 从 conn 中读取 response
  3. 释放连接、缓存各种过程中生成的 struct 对象

gRPC

服务端

gRPC 底层基于 http2,所以交互基于 http2 stream,服务端整体流程与 http2 没什么区别。

客户端

在 gRPC 中,客户端没有使用连接池,直接使用了 http2 连接:

Invoke
-> invoke 
-> newClientStream 
-> newAttemptLocked 
-> getTransport 
-> blockingpiker.pick 
->  getReadyTransport 
->  addrConn.connect 
-> go ac.resetTransport()

然后一路走到创建 http2Client。

(dlv) bt
0  0x00000000013e2539 in google.golang.org/grpc/internal/transport.newHTTP2Client
   at /Users/xargin/go/src/google.golang.org/grpc/internal/transport/http2_client.go:167
1  0x000000000145a5ca in google.golang.org/grpc/internal/transport.NewClientTransport
   at /Users/xargin/go/src/google.golang.org/grpc/internal/transport/transport.go:575
2  0x000000000145a5ca in google.golang.org/grpc.(*addrConn).createTransport
   at /Users/xargin/go/src/google.golang.org/grpc/clientconn.go:1275
3  0x0000000001459e25 in google.golang.org/grpc.(*addrConn).tryAllAddrs
   at /Users/xargin/go/src/google.golang.org/grpc/clientconn.go:1205
4  0x00000000014593b7 in google.golang.org/grpc.(*addrConn).resetTransport
   at /Users/xargin/go/src/google.golang.org/grpc/clientconn.go:1120
5  0x000000000105b811 in runtime.goexit
   at /usr/local/go/src/runtime/asm_amd64.s:1357

thrift

thrift 官方没有连接池,client 中生成的 seqid 只是用来和服务端返回的 rseqid 进行匹配。

func (p *TStandardClient) Recv(iprot TProtocol, seqId int32, method string, result TStruct) error {
	rMethod, rTypeId, rSeqId, err := iprot.ReadMessageBegin()
	if err != nil {
		return err
	}

	if method != rMethod {
		return NewTApplicationException(WRONG_METHOD_NAME, fmt.Sprintf("%s: wrong method name", method))
	} else if seqId != rSeqId {
		return NewTApplicationException(BAD_SEQUENCE_ID, fmt.Sprintf("%s: out of order sequence response", method))
	} else if rTypeId == EXCEPTION {
		var exception tApplicationException
		if err := exception.Read(iprot); err != nil {
			return err
		}

		if err := iprot.ReadMessageEnd(); err != nil {
			return err
		}

		return &exception
	} else if rTypeId != REPLY {
		return NewTApplicationException(INVALID_MESSAGE_TYPE_EXCEPTION, fmt.Sprintf("%s: invalid message type", method))
	}

	if err := result.Read(iprot); err != nil {
		return err
	}

	return iprot.ReadMessageEnd()
}

thrift 的每个 client 对象中包裹了一个 transport:

	...
	useTransport, err := transportFactory.GetTransport(transport)
	client := NewEchoClientFactory(useTransport, protocolFactory)
	if err := transport.Open(); err != nil {
		fmt.Fprintln(os.Stderr, "Error opening socket to 127.0.0.1:9898", " ", err)
		os.Exit(1)
	}
	defer transport.Close()

	req := &EchoReq{Msg: "You are welcome."}
	res, err := client.Echo(context.TODO(), req)
	...


type EchoClient struct {
	c thrift.TClient
}

func NewEchoClientFactory(t thrift.TTransport, f thrift.TProtocolFactory) *EchoClient {
	return &EchoClient{
		c: thrift.NewTStandardClient(f.GetProtocol(t), f.GetProtocol(t)),
	}
}

这个包裹的 transport 就是一条单独的 tcp 连接,没有连接池。

redigo

redigo 是个 client 库,没有服务端:

type Pool struct {
	// Dial is an application supplied function for creating and configuring a
	// connection.
	//
	// The connection returned from Dial must not be in a special state
	// (subscribed to pubsub channel, transaction started, ...).
	Dial func() (Conn, error)

	// DialContext is an application supplied function for creating and configuring a
	// connection with the given context.
	//
	// The connection returned from Dial must not be in a special state
	// (subscribed to pubsub channel, transaction started, ...).
	DialContext func(ctx context.Context) (Conn, error)

	// TestOnBorrow is an optional application supplied function for checking
	// the health of an idle connection before the connection is used again by
	// the application. Argument t is the time that the connection was returned
	// to the pool. If the function returns an error, then the connection is
	// closed.
	TestOnBorrow func(c Conn, t time.Time) error

	// Maximum number of idle connections in the pool.
	MaxIdle int

	// Maximum number of connections allocated by the pool at a given time.
	// When zero, there is no limit on the number of connections in the pool.
	MaxActive int

	// Close connections after remaining idle for this duration. If the value
	// is zero, then idle connections are not closed. Applications should set
	// the timeout to a value less than the server's timeout.
	IdleTimeout time.Duration

	// If Wait is true and the pool is at the MaxActive limit, then Get() waits
	// for a connection to be returned to the pool before returning.
	Wait bool

	// Close connections older than this duration. If the value is zero, then
	// the pool does not close connections based on age.
	MaxConnLifetime time.Duration

	chInitialized uint32 // set to 1 when field ch is initialized

	mu           sync.Mutex    // mu protects the following fields
	closed       bool          // set to true when the pool is closed.
	active       int           // the number of open connections in the pool
	ch           chan struct{} // limits open connections when p.Wait is true
	idle         idleList      // idle connections
	waitCount    int64         // total number of connections waited for.
	waitDuration time.Duration // total time waited for new connections.
}

客户端:

redigo 的客户端需要显式声明并初始化内部的 pool:

func newPool(addr string) *redis.Pool {
    return &redis.Pool{
        MaxIdle: 3,
        IdleTimeout: 240 * time.Second,
        // Dial or DialContext must be set. When both are set, DialContext takes precedence over Dial.
        Dial: func () (redis.Conn, error) { return redis.Dial("tcp", addr) },
    }
}

初始化时可以提供 TestOnBorrow 的行为:

pool := &redis.Pool{
  // Other pool configuration not shown in this example.
  TestOnBorrow: func(c redis.Conn, t time.Time) error {
    if time.Since(t) < time.Minute {
      return nil
    }
    _, err := c.Do("PING")
    return err
  },
}

使用时也需要用户显式地 defer Close:

func serveHome(w http.ResponseWriter, r *http.Request) {
    conn := pool.Get()
    defer conn.Close()
    ...
}

pool.Get

7

用户需要设置 pool.Wait 是否等待,如果 Waittrue,则在没有连接可用时,会阻塞等待。如果 Waitfalse,且连接已到达阈值 pool.MaxActive,则直接返回错误 ErrPoolExhausted。

activeConn.Close

func (ac *activeConn) Close() error {
	pc := ac.pc
	if pc == nil {
		return nil
	}
	ac.pc = nil

	if ac.state&connectionMultiState != 0 {
		pc.c.Send("DISCARD")
		ac.state &^= (connectionMultiState | connectionWatchState)
	} else if ac.state&connectionWatchState != 0 {
		pc.c.Send("UNWATCH")
		ac.state &^= connectionWatchState
	}
	if ac.state&connectionSubscribeState != 0 {
		pc.c.Send("UNSUBSCRIBE")
		pc.c.Send("PUNSUBSCRIBE")
		// To detect the end of the message stream, ask the server to echo
		// a sentinel value and read until we see that value.
		sentinelOnce.Do(initSentinel)
		pc.c.Send("ECHO", sentinel)
		pc.c.Flush()
		for {
			p, err := pc.c.Receive()
			if err != nil {
				break
			}
			if p, ok := p.([]byte); ok && bytes.Equal(p, sentinel) {
				ac.state &^= connectionSubscribeState
				break
			}
		}
	}
	pc.c.Do("")
	ac.p.put(pc, ac.state != 0 || pc.c.Err() != nil)
	return nil
}

close 时会把这个 activeConn 放回连接池。

go-redis/redis

https://github.com/go-redis/redis

这个 redis 库屏蔽了连接池逻辑,用户侧基本不用关心连接,初始化时,传入连接池相关配置:

	rdb := redis.NewClient(&redis.Options{
		Addr:     "localhost:6379", // use default Addr
		Password: "",               // no password set
		DB:       0,                // use default DB
	})

func NewClient(opt *Options) *Client {
	opt.init()

	c := Client{
		baseClient: newBaseClient(opt, newConnPool(opt)),
		ctx:        context.Background(),
	}
	c.cmdable = c.Process

	return &c
}

func newConnPool(opt *Options) *pool.ConnPool {
	return pool.NewConnPool(&pool.Options{
		Dialer: func(ctx context.Context) (net.Conn, error) {
			return opt.Dialer(ctx, opt.Network, opt.Addr)
		},
		PoolSize:           opt.PoolSize,
		MinIdleConns:       opt.MinIdleConns,
		MaxConnAge:         opt.MaxConnAge,
		PoolTimeout:        opt.PoolTimeout,
		IdleTimeout:        opt.IdleTimeout,
		IdleCheckFrequency: opt.IdleCheckFrequency,
	})
}
func (c *baseClient) _process(ctx context.Context, cmd Cmder) error {
	var lastErr error
	for attempt := 0; attempt <= c.opt.MaxRetries; attempt++ {
		if attempt > 0 {
			if err := internal.Sleep(ctx, c.retryBackoff(attempt)); err != nil {
				return err
			}
		}

		retryTimeout := true
		lastErr = c.withConn(ctx, func(ctx context.Context, cn *pool.Conn) error {
			err := cn.WithWriter(ctx, c.opt.WriteTimeout, func(wr *proto.Writer) error {
				return writeCmd(wr, cmd)
			})
			if err != nil {
				return err
			}

			err = cn.WithReader(ctx, c.cmdTimeout(cmd), cmd.readReply)
			if err != nil {
				retryTimeout = cmd.readTimeout() == nil
				return err
			}

			return nil
		})
		if lastErr == nil || !isRetryableError(lastErr, retryTimeout) {
			return lastErr
		}
	}
	return lastErr
}

func (c *baseClient) withConn(
	ctx context.Context, fn func(context.Context, *pool.Conn) error,
) error {
	cn, err := c.getConn(ctx)
	if err != nil {
		return err
	}
	defer func() {
		c.releaseConn(cn, err)
	}()

	err = fn(ctx, cn)
	return err

连接池维护的逻辑和其它库差不多。与其它库不同的是,该库会保证 idle 的 conns 维持在 MinIdleConn 配置数量之上,不足的话,会在后台补充:

func (p *ConnPool) checkMinIdleConns() {
	if p.opt.MinIdleConns == 0 {
		return
	}
	for p.poolSize < p.opt.PoolSize && p.idleConnsLen < p.opt.MinIdleConns {
		p.poolSize++
		p.idleConnsLen++
		go func() {
			err := p.addIdleConn()
			if err != nil {
				p.connsMu.Lock()
				p.poolSize--
				p.idleConnsLen--
				p.connsMu.Unlock()
			}
		}()
	}
}

database/sql

这里的连接池与 RPC 系列的稍有区别,取的是 freeConns 的第一个,并且有一个可能效率比较低的 copy 过程:

	// Prefer a free connection, if possible.
	numFree := len(db.freeConn)
	if strategy == cachedOrNewConn && numFree > 0 {
		conn := db.freeConn[0]
		copy(db.freeConn, db.freeConn[1:])
		db.freeConn = db.freeConn[:numFree-1]
		conn.inUse = true
		db.mu.Unlock()
		if conn.expired(lifetime) {
			conn.Close()
			return nil, driver.ErrBadConn
		}
		// Lock around reading lastErr to ensure the session resetter finished.
		conn.Lock()
		err := conn.lastErr
		conn.Unlock()
		if err == driver.ErrBadConn {
			conn.Close()
			return nil, driver.ErrBadConn
		}
		return conn, nil
	

其它的没啥特殊的。

一些连接池相关的总结
Share this

家境贫寒,整理不易,客官如果觉得不错欢迎打赏!我的微信 xargin_buaa,如果想交流也欢迎来加~

京ICP备15065353号-1