2.连接池实现
先看下连接池整体处理流程:

2.1 初始化DB
db, err := sql.Open("mysql", "username:password@tcp(host)/db_name?charset=utf8&allowOldPasswords=1")
sql.Open()是取出对应的db,这时mysql还没有建立连接,只是初始化了一个sql.DB结构,这是非常重要的一个结构,所有相关的数据都保存在此结构中;Open同时启动了一个connectionOpener协程,后面再具体分析其作用。
type DB struct {
driver driver.Driver //数据库实现驱动
dsn string //数据库连接、配置参数信息,比如username、host、password等
numClosed uint64
mu sync.Mutex //锁,操作DB各成员时用到
freeConn []*driverConn //空闲连接
connRequests []chan connRequest //阻塞请求队列,等连接数达到最大限制时,后续请求将插入此队列等待可用连接
numOpen int //已建立连接或等待建立连接数
openerCh chan struct{} //用于connectionOpener
closed bool
dep map[finalCloser]depSet
lastPut map[*driverConn]string // stacktrace of last conn's put; debug only
maxIdle int //最大空闲连接数
maxOpen int //数据库最大连接数
maxLifetime time.Duration //连接最长存活期,超过这个时间连接将不再被复用
cleanerCh chan struct{}
}
maxIdle(默认值2)、maxOpen(默认值0,无限制)、maxLifetime(默认值0,永不过期)可以分别通过SetMaxIdleConns、SetMaxOpenConns、SetConnMaxLifetime设定。
2.2 获取连接
上面说了Open时是没有建立数据库连接的,只有等用的时候才会实际建立连接,获取可用连接的操作有两种策略:cachedOrNewConn(有可用空闲连接则优先使用,没有则创建)、alwaysNewConn(不管有没有空闲连接都重新创建),下面以一个query的例子看下具体的操作:
rows, err := db.Query("select * from test")
database/sql/sql.go:
func (db *DB) Query(query string, args ...interface{}) (*Rows, error) {
var rows *Rows
var err error
//maxBadConnRetries = 2
for i := 0; i < maxBadConnRetries; i++ {
rows, err = db.query(query, args, cachedOrNewConn)
if err != driver.ErrBadConn {
break
}
}
if err == driver.ErrBadConn {
return db.query(query, args, alwaysNewConn)
}
return rows, err
}
func (db *DB) query(query string, args []interface{}, strategy connReuseStrategy) (*Rows, error) {
ci, err := db.conn(strategy)
if err != nil {
return nil, err
}
//到这已经获取到了可用连接,下面进行具体的数据库操作
return db.queryConn(ci, ci.releaseConn, query, args)
}
数据库连接由db.query()获取:
func (db *DB) conn(strategy connReuseStrategy) (*driverConn, error) {
db.mu.Lock()
if db.closed {
db.mu.Unlock()
return nil, errDBClosed
}
lifetime := db.maxLifetime
//从freeConn取一个空闲连接
numFree := len(db.freeConn)
if strategy == cachedOrNewConn && numFree > 0 {
conn := db.freeConn[0]
copy(db.freeConn, db.freeConn[1:])
db.freeConn = db.freeConn[:numFree-1]
conn.inUse = true
db.mu.Unlock()
if conn.expired(lifetime) {
conn.Close()
return nil, driver.ErrBadConn
}
return conn, nil
}
//如果没有空闲连接,而且当前建立的连接数已经达到最大限制则将请求加入connRequests队列,
//并阻塞在这里,直到其它协程将占用的连接释放或connectionOpenner创建
if db.maxOpen > 0 && db.numOpen >= db.maxOpen {
// Make the connRequest channel. It's buffered so that the
// connectionOpener doesn't block while waiting for the req to be read.
req := make(chan connRequest, 1)
db.connRequests = append(db.connRequests, req)
db.mu.Unlock()
ret, ok := <-req //阻塞
if !ok {
return nil, errDBClosed
}
if ret.err == nil && ret.conn.expired(lifetime) { //连接过期了
ret.conn.Close()
return nil, driver.ErrBadConn
}
return ret.conn, ret.err
}
db.numOpen++ //上面说了numOpen是已经建立或即将建立连接数,这里还没有建立连接,只是乐观的认为后面会成功,失败的时候再将此值减1
db.mu.Unlock()
ci, err := db.driver.Open(db.dsn) //调用driver的Open方法建立连接
if err != nil { //创建连接失败
db.mu.Lock()
db.numOpen-- // correct for earlier optimism
db.maybeOpenNewConnections() //通知connectionOpener协程尝试重新建立连接,否则在db.connRequests中等待的请求将一直阻塞,知道下次有连接建立
db.mu.Unlock()
return nil, err
}
db.mu.Lock()
dc := &driverConn{
db: db,
createdAt: nowFunc(),
ci: ci,
}
db.addDepLocked(dc, dc)
dc.inUse = true
db.mu.Unlock()
return dc, nil
}










