zhenlanghuo's Blog

Golang database/sql与go-sql-driver/mysql 源码阅读笔记 -- database/sql篇

2019/06/05

偶然看见golang之database/sql与go-sql-driver这篇博客,让我有了阅读学习go数据库访问接口和mysql驱动实现的源码的想法和动力。

因为刚好工作负责的组件中就是需要使用mysql,之前就出现过一些问题,但是因为当时对go database/sqlgo-sql-driver/mysql的源码一点都不熟悉,只会用几个api,对于问题出现的原因一头雾水,希望通过阅读学习源码提升能力的同时,以后出现问题可以更加清楚应该怎么解决。


前言

database/sql主要是提供了对数据库的读写操作的接口和数据库连接池功能,需要向其注册相应的数据库驱动才能工作。

go-sql-driver/mysql是mysql数据库驱动,mysql客户端的真正实现,负责与mysql服务端进行交互,完成连接、读写操作。

本文源码版本如下:
database/sql: go1.10.3
go-sql-driver/mysql: v1.4


获取数据库对象

引用golang之database/sql与go-sql-driver中的例子

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
package main

import (
"database/sql"
"fmt"
_ "github.com/go-sql-driver/mysql"
)

func main() {
db, err := sql.Open("mysql", "root:root@/test")
checkErr(err)
rows, err := db.Query("select * from test")
checkErr(err)
var (
id uint64
name string
)
for rows.Next() {
err = rows.Scan(&id, &name)
checkErr(err)
fmt.Println(id, "->", name)
}
}

func checkErr(err error) {
if err != nil {
panic(err)
}
}

通过调用sql.Open来获取一个数据库对象db,然后就可以用这个数据库对象来对数据进行操作了。

直接来看一下sql.Open源码里做了些什么

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
//database/sql/sql.go

var (
driversMu sync.RWMutex
drivers = make(map[string]driver.Driver)
)

/*-------------------------------------------------*/

func Open(driverName, dataSourceName string) (*DB, error) {
driversMu.RLock()
driveri, ok := drivers[driverName]
driversMu.RUnlock()
if !ok {
return nil, fmt.Errorf("sql: unknown driver %q (forgotten import?)", driverName)
}

if driverCtx, ok := driveri.(driver.DriverContext); ok {
connector, err := driverCtx.OpenConnector(dataSourceName)
if err != nil {
return nil, err
}
return OpenDB(connector), nil
}

return OpenDB(dsnConnector{dsn: dataSourceName, driver: driveri}), nil
}

/*-------------------------------------------------*/

func OpenDB(c driver.Connector) *DB {
ctx, cancel := context.WithCancel(context.Background())
db := &DB{
connector: c,
openerCh: make(chan struct{}, connectionRequestQueueSize),
resetterCh: make(chan *driverConn, 50),
lastPut: make(map[*driverConn]string),
connRequests: make(map[uint64]chan connRequest),
stop: cancel,
}

go db.connectionOpener(ctx)
go db.connectionResetter(ctx)

return db
}

sql.Open的工作主要是根据driverName获取相应的数据库驱动对象,然后调用 sql.OpenDB返回数据库对象。

sql.OpenDB的工作是创建数据库对象,同时开启两个协程connectionOpenerconnectionResetterconnectionOpener负责收到请求后新建连接,connectionResetter负责异步重置连接,具体详细的信息后面有机会再说。 

数据库驱动注册

这个时候应该都会好奇有疑问,drivers这个map的值是什么时候设置的,上面也说了需要向database/sql注册相应的数据库驱动才能工作,但是从上面的例子没有看出哪里有进行数据库驱动注册的操作。

打开github.com/go-sql-driver/mysql/driver.go找到init函数,可以发现,原来在导入github.com/go-sql-driver/mysql之后,这个库就自动(偷偷)注册一个mysql数据库驱动到sql.drivers

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
//github.com/go-sql-driver/mysql/driver.go
func init() {
sql.Register("mysql", &MySQLDriver{})
}

/*--------------------------------------------------------*/
//database/sql/sql.go
func Register(name string, driver driver.Driver) {
driversMu.Lock()
defer driversMu.Unlock()
if driver == nil {
panic("sql: Register driver is nil")
}
if _, dup := drivers[name]; dup {
panic("sql: Register called twice for driver " + name)
}
drivers[name] = driver
}

连接池

sql.Open只是创建了数据库对象(简称:db)以及开启了两个协程,并没有连接数据库的操作,但是直接调用DB.Query就可以进行数据库查询了,这是因为database/sql实现了数据库连接池功能,直接调用DB.Query,会获取一个可用的数据库连接(新建或从连接池中获取),然后调用数据库驱动的接口进行数据库操作。

DB.conn

DB.conn是db内部获取数据库连接的函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
//database/sql/sql.go
type DB struct {
connector driver.Connector
mu sync.Mutex // protects following fields
freeConn []*driverConn // 空闲连接队列
connRequests map[uint64]chan connRequest
numOpen int // 正在打开连接数(空闲+正在使用)
maxIdle int // 最大空闲连接数
maxOpen int // 最大打开连接数, 0为没有限制
maxLifetime time.Duration // 连接最大生存时间
}

/*--------------------------------------------------------*/

func (db *DB) conn(ctx context.Context, strategy connReuseStrategy) (*driverConn, error) {
db.mu.Lock()
if db.closed {
db.mu.Unlock()
return nil, errDBClosed
}

...省去一些代码...

lifetime := db.maxLifetime

// 优先从空闲连接队列中获取
numFree := len(db.freeConn)
if strategy == cachedOrNewConn && numFree > 0 {
conn := db.freeConn[0]
...省去一些代码...
return conn, nil
}

// 如果strategy不是cachedOrNewConn,或者空闲连接队列为空,需要创建一个新的连接。
// 创建新连接前先检查最大打开连接数,如果打开的连接数已经到达了限制,创建一个获取连接请求并等待。
if db.maxOpen > 0 && db.numOpen >= db.maxOpen {
// Make the connRequest channel. It's buffered so that the
// connectionOpener doesn't block while waiting for the req to be read.
req := make(chan connRequest, 1)
reqKey := db.nextRequestKeyLocked()
db.connRequests[reqKey] = req
db.mu.Unlock()

// Timeout the connection request with the context.
select {
case <-ctx.Done():
...省去一些代码...
return nil, ctx.Err()
case ret, ok := <-req:
...省去一些代码...
return ret.conn, ret.err
}
}

// 创建新连接
db.numOpen++ // optimistically
db.mu.Unlock()
ci, err := db.connector.Connect(ctx)
...省去一些代码...
db.mu.Lock()
dc := &driverConn{
db: db,
createdAt: nowFunc(),
ci: ci,
inUse: true,
}
db.addDepLocked(dc, dc)
db.mu.Unlock()
return dc, nil
}

DB.conn获取连接步骤可以总结为3步:

  1. 如果strategy是cachedOrNewConn,优先从空闲队列中获取
  2. 如果strategy不是cachedOrNewConn,或者空闲队列为空,则先检查最大打开连接数和正在打开连接数,如果正在打开连接数已经到达了限制,创建一个获取连接请求并等待。
  3. 如果正在打开连接数还没有到达限制,直接创建一个新连接。

创建新连接是调用db.connector.Connect进行创建的,先来回顾一下,db.connector是什么,是怎么来的:

1
2
3
4
5
6
7
8
9
10
11
12
//database/sql/sql.go
func Open(driverName, dataSourceName string) (*DB, error) {
...省去一些代码...
if driverCtx, ok := driveri.(driver.DriverContext); ok {
connector, err := driverCtx.OpenConnector(dataSourceName)
if err != nil {
return nil, err
}
return OpenDB(connector), nil
}
return OpenDB(dsnConnector{dsn: dataSourceName, driver: driveri}), nil
}

查看go-sql-driver/mysql的源码发现,mysql驱动没有实现driver.DriverContext接口,因此实际是创建了dsnConnector

1
2
3
4
//database/sql/sql.go
func (t dsnConnector) Connect(_ context.Context) (driver.Conn, error) {
return t.driver.Open(t.dsn)
}

dsnConnector.Connect调用驱动的Open函数来获取一个数据库驱动对象。

driverConn.releaseConn、 DB.PutConn、DB.putConnDBLocked

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
//database/sql/sql.go
func (dc *driverConn) releaseConn(err error) {
dc.db.putConn(dc, err, true)
}

/*--------------------------------------------------------*/

func (db *DB) putConn(dc *driverConn, err error, resetSession bool) {
db.mu.Lock()
...省去一些代码...
if err == driver.ErrBadConn {
// 如果连接已经是坏的,则丢弃该连接并尝试创建一个新的连接
// 不需要在这里对db.numOpen进行减一,dc.Close最终会调用dc.finalClose对db.numOpen进行减一
db.maybeOpenNewConnections()
db.mu.Unlock()
dc.Close()
return
}
...省去一些代码...
added := db.putConnDBLocked(dc, nil)
db.mu.Unlock()

if !added {
if resetSession {
dc.Unlock()
}
dc.Close()
return
}
if !resetSession {
return
}
select {
default:
// If the resetterCh is blocking then mark the connection
// as bad and continue on.
dc.lastErr = driver.ErrBadConn
dc.Unlock()
case db.resetterCh <- dc:
}
}

/*--------------------------------------------------------*/

func (db *DB) putConnDBLocked(dc *driverConn, err error) bool {
if db.closed {
return false
}
if db.maxOpen > 0 && db.numOpen > db.maxOpen {
return false
}
if c := len(db.connRequests); c > 0 {
var req chan connRequest
var reqKey uint64
for reqKey, req = range db.connRequests {
break
}
delete(db.connRequests, reqKey) // Remove from pending requests.
if err == nil {
dc.inUse = true
}
req <- connRequest{
conn: dc,
err: err,
}
return true
} else if err == nil && !db.closed && db.maxIdleConnsLocked() > len(db.freeConn) {
db.freeConn = append(db.freeConn, dc)
db.startCleanerLocked()
return true
}
return false
}

driverConn.releaseConn函数的作用是释放连接,一般是在数据库操作完成或者数据库事务结束后调用的。

driverConn.releaseConn调用DB.PutConnDB.PutConn调用DB.putConnDBLocked

DB.putConnDBLocked负责丢弃连接、将连接返回给等待的协程或放到空闲队列:

  1. 如果正在打开连接数已经到达限制,丢弃连接,返回fasle
  2. 如果db.connRequests里存在连接请求,则取出其中一个请求,把连接通过channel返回给该请求的协程
  3. 如果db.connRequests里没有连接请求,则检查空闲连接数有没有达到限制,如果没有,就将该连接放到空闲连接队列中,否则丢弃连接,返回false。

DB.maybeOpenNewConnections、 DB.connectionOpener

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
//database/sql/sql.go
func (db *DB) maybeOpenNewConnections() {
numRequests := len(db.connRequests)
if db.maxOpen > 0 {
numCanOpen := db.maxOpen - db.numOpen
if numRequests > numCanOpen {
numRequests = numCanOpen
}
}
for numRequests > 0 {
db.numOpen++ // optimistically
numRequests--
if db.closed {
return
}
db.openerCh <- struct{}{}
}
}

DB.maybeOpenNewConnections根据目前的连接请求数量和目前还能打开的连接数量判断是否发送创建连接信号给协程connectionOpener

connectionOpener就是在DB.Open时开启的一个协程,就是用于在出现异常情况的情况下(比如释放的连接是坏连接),进行异步的创建连接。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
//database/sql/sql.go
func (db *DB) connectionOpener(ctx context.Context) {
for {
select {
case <-ctx.Done():
return
case <-db.openerCh:
db.openNewConnection(ctx)
}
}
}

func (db *DB) openNewConnection(ctx context.Context) {
// maybeOpenNewConnctions has already executed db.numOpen++ before it sent
// on db.openerCh. This function must execute db.numOpen-- if the
// connection fails or is closed before returning.
ci, err := db.connector.Connect(ctx)
db.mu.Lock()
defer db.mu.Unlock()
if db.closed {
if err == nil {
ci.Close()
}
db.numOpen--
return
}
if err != nil {
db.numOpen--
db.putConnDBLocked(nil, err)
// 还没有想明白这里为什么要调用db.maybeOpenNewConnections()
db.maybeOpenNewConnections()
return
}
dc := &driverConn{
db: db,
createdAt: nowFunc(),
ci: ci,
}
if db.putConnDBLocked(dc, err) {
db.addDepLocked(dc, dc)
} else {
db.numOpen--
ci.Close()
}
}

connectionOpener协程循环监听db.openerCh管道,收到消息就调用DB.openNewConnection创建一个新连接,再通过
DB.putConnDBLocked把连接返回给等待的协程。

总结

database/sql没有专门定义一个连接池结构来实现连接池,连接池的功能融合在了数据库对象DB

这里直接引用golang之database/sql与go-sql-driver中对连接池的总结:

  • 当某个goroutine需要一个连接的时候,首先查看空闲连接数组中是否有可用连接,如果有,则直接从空闲数组中获取;如果空闲数组中没有可用的连接,则需要判断当前打开的连接数是否超出设定的最大值,如果是,则当前goroutine阻塞;如果当前打开的连接数并没有大于设定的最大值,则直接生产一个连接返回;
  • 当某个goroutine结束数据库操作时,将当前使用的连接放入空闲连接数组中,这时需要进行判断,是否有某个goroutine阻塞在获取连接上,如果有,则将当前的连接直接返回给阻塞的goroutine,如果没有goroutine阻塞在获取连接上,则可以直接放入空闲连接数组即可;

另外,下面整理出DB类中主要用来实现连接池的函数:

  • DB.conn:内部获取连接的函数,“连接池”获取连接的入口。
  • driverConn.releaseConn:调用DB.PutConn释放连接,一般在数据库操作完成或事务结束后被调用。
  • DB.PutConn:进行一些处理,调用DB.putConnDBLocked
  • DB.putConnDBLocked:丢弃连接、返回连接给等待的协程或放到空闲队列。
  • DB.maybeOpenNewConnections:根据目前的连接请求数量和目前还能打开的连接数量判断是否发送创建连接信号给协程connectionOpener
  • DB.connectionOpener:负责异步创建连接。

数据库操作

DB.Query & DB.QueryContext

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
//database/sql/sql.go
func (db *DB) QueryContext(ctx context.Context, query string, args ...interface{}) (*Rows, error) {
var rows *Rows
var err error
for i := 0; i < maxBadConnRetries; i++ {
rows, err = db.query(ctx, query, args, cachedOrNewConn)
if err != driver.ErrBadConn {
break
}
}
if err == driver.ErrBadConn {
return db.query(ctx, query, args, alwaysNewConn)
}
return rows, err
}

/*--------------------------------------------------------*/

func (db *DB) Query(query string, args ...interface{}) (*Rows, error) {
return db.QueryContext(context.Background(), query, args...)
}

/*--------------------------------------------------------*/

func (db *DB) query(ctx context.Context, query string, args []interface{}, strategy connReuseStrategy) (*Rows, error) {
dc, err := db.conn(ctx, strategy)
if err != nil {
return nil, err
}

return db.queryDC(ctx, nil, dc, dc.releaseConn, query, args)
}

/*--------------------------------------------------------*/

func (db *DB) queryDC(ctx, txctx context.Context, dc *driverConn, releaseConn func(error), query string, args []interface{}) (*Rows, error) {
queryerCtx, ok := dc.ci.(driver.QueryerContext)
var queryer driver.Queryer
if !ok {
queryer, ok = dc.ci.(driver.Queryer)
}
// 如果驱动有实现QueryerContext或者Queryer接口,则直接执行驱动的相应接口
if ok {
var nvdargs []driver.NamedValue
var rowsi driver.Rows
var err error
withLock(dc, func() {
nvdargs, err = driverArgsConnLocked(dc.ci, nil, args)
if err != nil {
return
}
rowsi, err = ctxDriverQuery(ctx, queryerCtx, queryer, query, nvdargs)
})
if err != driver.ErrSkip {
if err != nil {
releaseConn(err)
return nil, err
}
// Note: ownership of dc passes to the *Rows, to be freed
// with releaseConn.
rows := &Rows{
dc: dc,
releaseConn: releaseConn,
rowsi: rowsi,
}
// 监听context,超时或cancel时调用rows.close
rows.initContextClose(ctx, txctx)
return rows, nil
}
}

// 如果如果驱动没有实现QueryerContext或者Queryer接口,又或者返回了driver.ErrSkip错误
// 则采取先预编译查询语句再传参进行查询的方法
var si driver.Stmt
var err error
withLock(dc, func() {
si, err = ctxDriverPrepare(ctx, dc.ci, query)
})
if err != nil {
releaseConn(err)
return nil, err
}

ds := &driverStmt{Locker: dc, si: si}
rowsi, err := rowsiFromStatement(ctx, dc.ci, ds, args...)
if err != nil {
ds.Close()
releaseConn(err)
return nil, err
}

// Note: ownership of ci passes to the *Rows, to be freed
// with releaseConn.
rows := &Rows{
dc: dc,
releaseConn: releaseConn,
rowsi: rowsi,
closeStmt: ds,
}
// 监听context,超时或cancel时调用rows.close
rows.initContextClose(ctx, txctx)
return rows, nil
}
  • DB.Query其实是调用DB.QueryContext,自动传入一个空的context,因此DB.Query无法通过context设置超时或提前取消。
  • DB.QueryContext调用db.conn来获取连接,获取连接成功后,调用驱动相应的接口进行查询操作:
  1. 如果驱动有实现QueryerContext或者Queryer接口,则直接执行驱动的相应接口
  2. 如果如果驱动没有实现QueryerContext或者Queryer接口,又或者返回了driver.ErrSkip错误,则采取先预编译查询语句再传参进行查询的方法
  • 观察31行代码,调用DB.queryDC时传入了db.releaseConn函数,就是为查询操作结束或者发生异常情况释放连接做准备的。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
//database/sql/sql.go
func (rs *Rows) initContextClose(ctx, txctx context.Context) {
// 把cancel函数传给rows,使得没有发生context超时或cancel时,rows.close能调用cancel函数结束awaitDone协程
ctx, rs.cancel = context.WithCancel(ctx)
go rs.awaitDone(ctx, txctx)
}

func (rs *Rows) awaitDone(ctx, txctx context.Context) {
var txctxDone <-chan struct{}
if txctx != nil {
txctxDone = txctx.Done()
}
select {
case <-ctx.Done():
case <-txctxDone:
}
rs.close(ctx.Err())
}
  • rows.initContextClose函数会开启一个协程,阻塞监听context的Done事件消息,若发生context超时或cancel事件,调用rows.close
  • 第4行代码,把cancel函数传给rows,使得没有发生context超时或cancel时,rows.close能调用cancel函数结束awaitDone协程

Rows.Next & Rows.NextResultSet & Rows.Scan & Rows.Close

DB.QueryDBQueryContext接口返回的是Rows对象,还需要调用Rows.NextRows.NextResultSetRows.Scan函数进行数据的读取,如下所示:

1
2
3
4
5
6
7
8
9
10
11
rows, _ := db.Query()
defer rows.Close()
for rows.Next() {
rows.Scan()
}
// 如果是多结果集查询
for rows.NextResultSet() {
for rows.Next() {
rows.Scan()
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
//database/sql/sql.go
func (rs *Rows) Next() bool {
var doClose, ok bool
withLock(rs.closemu.RLocker(), func() {
doClose, ok = rs.nextLocked()
})
if doClose {
rs.Close()
}
return ok
}

/*--------------------------------------------------------*/

func (rs *Rows) nextLocked() (doClose, ok bool) {
if rs.closed {
return false, false
}

rs.dc.Lock()
defer rs.dc.Unlock()

if rs.lastcols == nil {
rs.lastcols = make([]driver.Value, len(rs.rowsi.Columns()))
}

// 调用驱动的rows.Next接口,读取一行数据到rs.lastcols
rs.lasterr = rs.rowsi.Next(rs.lastcols)
if rs.lasterr != nil {
// Close the connection if there is a driver error.
if rs.lasterr != io.EOF {
return true, false
}
nextResultSet, ok := rs.rowsi.(driver.RowsNextResultSet)
if !ok {
return true, false
}
// 检查连接的Socket中是否有下一个结果集的数据,如果没有则可以关闭连接
if !nextResultSet.HasNextResultSet() {
doClose = true
}
return doClose, false
}
return false, true
}
  • Rows.nextLocked返回两个值,一个是doClose,一个是ok;doClose表示连接是否关闭了,ok表示是否有读取到一行数据:
  1. 如果正常读到一行数据,则返回doClose=false,ok=true
  2. 如果读数据返回错误,并且错误不是io.EOF,则返回doClose=true,ok=false
  3. 如果返回的错误是io.EOF,表示当前结果集的数据已经读取完,这个时候需要检查该连接中是否还存在其他的结果集数据,如果还有,返回doClose=false,ok=false;如果没有,返回doClose=true,ok=false。
  • Rows.Next调用Rows.nextLocked,根据其返回的doClose判断是否调用Rows.close。注意,Rows.Next返回false只代表已经读取完当前数据集的数据,不代表该次查询返回的数据已经读取完,不代表Rows.Next会自动调用Rows.close;如果该次查询是多语句查询,返回了多个结果集而且连接的socket上还有结果集的数据的话,Rows.nextLocked返回的doClose为false,Rows.Next就不会调用Rows.close。可以通过调用Rows.NextResultSet判断是否还存在下一个数据集数据并获取下一个数据集结果。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
//database/sql/sql.go
func (rs *Rows) Scan(dest ...interface{}) error {
rs.closemu.RLock()
if rs.closed {
rs.closemu.RUnlock()
return errors.New("sql: Rows are closed")
}
rs.closemu.RUnlock()

if rs.lastcols == nil {
return errors.New("sql: Scan called without calling Next")
}
if len(dest) != len(rs.lastcols) {
return fmt.Errorf("sql: expected %d destination arguments in Scan, not %d", len(rs.lastcols), len(dest))
}
// 遍历rs.lastcols,给读取到的数据进行格式转换,赋值给传入的指针参数
for i, sv := range rs.lastcols {
err := convertAssign(dest[i], sv)
if err != nil {
return fmt.Errorf("sql: Scan error on column index %d: %v", i, err)
}
}
return nil
}

Rows.Scan的工作是将在Rows.Next中读取到的保存到rs.lastcols数据进行格式转换赋值给传入的指针参数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
//database/sql/sql.go
func (rs *Rows) Close() error {
return rs.close(nil)
}

func (rs *Rows) close(err error) error {
rs.closemu.Lock()
defer rs.closemu.Unlock()

if rs.closed {
return nil
}
rs.closed = true

if rs.lasterr == nil {
rs.lasterr = err
}

withLock(rs.dc, func() {
// 驱动的close函数
err = rs.rowsi.Close()
})
// 内部测试用的钩子函数
if fn := rowsCloseHook(); fn != nil {
fn(rs, &err)
}
// 结束Rows.awaitDone协程
if rs.cancel != nil {
rs.cancel()
}

if rs.closeStmt != nil {
rs.closeStmt.Close()
}
// 传入的dc.releaseConn函数,释放连接
rs.releaseConn(err)
return err
}
  • Rows.close调用驱动的rowsi.Close,驱动的 rowsi.Close会将该次查询的未读取完的数据都读取完并丢弃。
  • rowsCloseHook是给内部测试用的钩子函数。
  • Rows.close还会调用rs.cancle结束Rows.awaitDone协程。
  • Rows.close最后会调用传入的dc.releaseConn函数来释放连接。

总结:

  1. 正常情况下循环调用Rows.NextRows.Scan直至所有数据读取完成(如果是多结果集查询,还需要调用Rows.NextResultSet),Rows.Next会自动调用Rows.Close
  2. 如果发生了错误或异常,数据没有读取完成,务必调用Rows.Close。如果数据没有读取完成,又没有调用Rows.Close,可能会导致连接损坏或者连接一直不能被释放,关于这一部分的分析,需要在go-sql-driver/mysql篇中结合驱动源码实现来进行具体分析。

DB.Begin & DB.BeginTx

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
//database/sql/sql.go
func (db *DB) begin(ctx context.Context, opts *TxOptions, strategy connReuseStrategy) (tx *Tx, err error) {
dc, err := db.conn(ctx, strategy)
if err != nil {
return nil, err
}
return db.beginDC(ctx, dc, dc.releaseConn, opts)
}

func (db *DB) beginDC(ctx context.Context, dc *driverConn, release func(error), opts *TxOptions) (tx *Tx, err error) {
var txi driver.Tx
withLock(dc, func() {
txi, err = ctxDriverBegin(ctx, opts, dc.ci)
})
if err != nil {
release(err)
return nil, err
}

ctx, cancel := context.WithCancel(ctx)
tx = &Tx{
db: db,
// 该事务对象的所有操作都使用同一个连接
dc: dc,
releaseConn: release,
txi: txi,
// 在tx.Commit或者tx.RollBack中调用,结束tx.awaitDone协程
cancel: cancel,
ctx: ctx,
}
go tx.awaitDone()
return tx, nil
}

func (tx *Tx) awaitDone() {
<-tx.ctx.Done()
// 当context超时时,调用tx.rollback进行事务回滚
// 如果context没有超时,由tx.Commit或者tx.Rollback主动取消context,tx.rollback(true)是无害的
tx.rollback(true)
}
  • 与上面的DB.Query&DB.QueryContext相同,DB.Begin调用DB.BeginTx,传入一个空的context,这里就不再把代码贴出来。
  • DB.BeginContext实际调用DB.beginDB.beginDC,注意在tx结构中会保存连接对象dc,以后使用tx对象的接口进行数据库操作时,都使用该连接。
  • tx.awaitDone的作用和套路跟rows.awatiDone类似,监听context的Done事件消息,在context超时时,调用tx.rollback进行事务回滚;context的cancel函数传给tx,在tx.Committx.Rollback时调用,结束tx.awaitDone协程。

DB.PrepareContext、TX.PrepareContext、Conn.PrepareContext

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
func (tx *Tx) PrepareContext(ctx context.Context, query string) (*Stmt, error) {
dc, release, err := tx.grabConn(ctx)
...省去一些代码...
// 把自身赋值给cg参数
stmt, err := tx.db.prepareDC(ctx, dc, release, tx, query)
...省去一些代码...
return stmt, nil
}

func (db *DB) prepare(ctx context.Context, query string, strategy connReuseStrategy) (*Stmt, error) {
dc, err := db.conn(ctx, strategy)
if err != nil {
return nil, err
}
// cg=nil
return db.prepareDC(ctx, dc, dc.releaseConn, nil, query)
}

func (c *Conn) PrepareContext(ctx context.Context, query string) (*Stmt, error) {
dc, release, err := c.grabConn(ctx)
if err != nil {
return nil, err
}
// 把自身赋值给cg参数
return c.db.prepareDC(ctx, dc, release, c, query)
}

func (db *DB) prepareDC(ctx context.Context, dc *driverConn, release func(error), cg stmtConnGrabber, query string) (*Stmt, error) {
var ds *driverStmt
var err error
defer func() {
release(err)
}()
withLock(dc, func() {
ds, err = dc.prepareLocked(ctx, cg, query)
})
if err != nil {
return nil, err
}
stmt := &Stmt{
db: db,
query: query,
cg: cg,
cgds: ds,
}

// When cg == nil this statement will need to keep track of various
// connections they are prepared on and record the stmt dependency on
// the DB.
if cg == nil {
stmt.css = []connStmt{{dc, ds}}
stmt.lastNumClosed = atomic.LoadUint64(&db.numClosed)
db.addDep(stmt, stmt)
}
return stmt, nil
}

// stmtConnGrabber represents a Tx or Conn that will return the underlying
// driverConn and release function.
type stmtConnGrabber interface {
// grabConn returns the driverConn and the associated release function
// that must be called when the operation completes.
grabConn(context.Context) (*driverConn, releaseConn, error)

// txCtx returns the transaction context if available.
// The returned context should be selected on along with
// any query context when awaiting a cancel.
txCtx() context.Context
}
  • DB.PrepareContextTX.PrepareContextConn.PrepareContext最终都是调用DB.prepareDC
  • Conn是数据库连接对象,通过DB.Conn获取,Conn也有QueryContext、ExecuteContext、PrepareContext等数据库操作接口。
  • TXConn都实现了stmtConnGrabber接口,因此TXConn在调用DB.prepareDC时都把自身赋值给cg参数,而DB则是把cg参数设为nil,记住这一点,接下来看Stmt.connStmt函数
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
// 使用Stmt进行数据库操作时,都需要先调用connStmt获取一个数据库连接
func (s *Stmt) connStmt(ctx context.Context, strategy connReuseStrategy) (dc *driverConn, releaseConn func(error), ds *driverStmt, err error) {
if err = s.stickyErr; err != nil {
return
}
s.mu.Lock()
if s.closed {
s.mu.Unlock()
err = errors.New("sql: statement is closed")
return
}

// In a transaction or connection, we always use the connection that the
// the stmt was created on.
// 如果是通过TX或Conn获取到的Stmt对象,s.cg不为nil,直接用过cg.grabConn获取已经进行过Prepare的数据库连接
if s.cg != nil {
s.mu.Unlock()
dc, releaseConn, err = s.cg.grabConn(ctx) // blocks, waiting for the connection.
if err != nil {
return
}
return dc, releaseConn, s.cgds, nil
}

s.removeClosedStmtLocked()
s.mu.Unlock()

// 从连接池中获取一个数据库连接
dc, err = s.db.conn(ctx, strategy)
if err != nil {
return nil, nil, nil, err
}

// 检查获取到的数据库连接是否已经进行过Prepare
s.mu.Lock()
for _, v := range s.css {
if v.dc == dc {
s.mu.Unlock()
return dc, dc.releaseConn, v.ds, nil
}
}
s.mu.Unlock()

// No luck; we need to prepare the statement on this connection
// 如果获取到的数据库连接没有进行过Prepare,得再进行一下Prepare
withLock(dc, func() {
ds, err = s.prepareOnConnLocked(ctx, dc)
})
if err != nil {
dc.releaseConn(err)
return nil, nil, nil, err
}

return dc, dc.releaseConn, ds, nil
}

TXConn都保存了一个连接,进行数据库操作时都是用同一个连接,而DB进行数据库操作则是每次从连接池中获取一个连接,如果该连接没有进行过该查询/执行语句的Prepare操作的话,需要先进行Prepare操作。因此使用预编译应该使用TX或者Conn,使用DB的接口来进行预编译操作的话,必须是一个很大批量操作才能起到理论上的加速作用,不然反而会增加与数据库服务器之间的通信交互。

CATALOG
  1. 1. 前言
  2. 2. 获取数据库对象
    1. 2.1. 数据库驱动注册
  3. 3. 连接池
    1. 3.1. DB.conn
    2. 3.2. driverConn.releaseConn、 DB.PutConn、DB.putConnDBLocked
    3. 3.3. DB.maybeOpenNewConnections、 DB.connectionOpener
    4. 3.4. 总结
  4. 4. 数据库操作
    1. 4.1. DB.Query & DB.QueryContext
    2. 4.2. Rows.Next & Rows.NextResultSet & Rows.Scan & Rows.Close
    3. 4.3. DB.Begin & DB.BeginTx
    4. 4.4. DB.PrepareContext、TX.PrepareContext、Conn.PrepareContext