一个普遍的观点是Goroutine是无法被中断的。当一个Goroutine在做conn.Read时,这个协程就被阻塞在那里了。实际上并不是毫无办法的,我们可以通过conn.Close来中断Goroutine。但是在连接池的情况下,又无法Close链接。另外一种做法就是通过SetDeadline为一个过去的时间戳来中断当前正在进行的阻塞读或者阻塞写。
var pool = make(chan net.Conn, 100)
type client struct {
conn net.Conn
inUse *sync.WaitGroup
}
func borrow() (clt *client, err error) {
var conn net.Conn
select {
case conn = <- pool:
default:
conn, err = net.Dial("tcp", "127.0.0.1:18849")
}
if err != nil {
return nil, err
}
clt = &client{
conn: conn,
inUse: &sync.WaitGroup{},
}
return
}
func release(clt *client) error {
clt.conn.SetDeadline(time.Now().Add(-time.Second))
clt.inUse.Done()
clt.inUse.Wait()
select {
case pool <- clt.conn:
// returned to pool
return nil
default:
// pool is overflow
return clt.conn.Close()
}
}
func handle(server *net.TCPConn) {
defer server.Close()
clt, err := borrow()
if err != nil {
fmt.Print(err)
return
}
clt.inUse.Add(1)
defer release(clt)
go func() {
clt.inUse.Add(1)
defer server.Close()
defer release(clt)
buf := make([]byte, 2048)
io.CopyBuffer(server, clt.conn, buf)
}()
buf := make([]byte, 2048)
io.CopyBuffer(clt.conn, server, buf)
}
通过SetDeadline实现了goroutine的中断,然后通过sync.WaitGroup来保证这些使用方都退出了之后再归还给连接池。否则一个连接被复用的时候,之前的使用方可能还没有退出。
连接有效性
为了保证在归还给pool之前,连接仍然是有效的。连接在被读写的过程中如果发现了error,我们就要标记这个连接是有问题的,会释放之后直接close掉。但是SetDeadline必然会导致读取或者写入的时候出现一次timeout的错误,所以还需要把timeout排除掉。
var pool = make(chan net.Conn, 100)
type client struct {
conn net.Conn
inUse *sync.WaitGroup
isValid int32
}
const maybeValid = 0
const isValid = 1
const isInvalid = 2
func (clt *client) Read(b []byte) (n int, err error) {
n, err = clt.conn.Read(b)
if err != nil {
if !isTimeoutError(err) {
atomic.StoreInt32(&clt.isValid, isInvalid)
}
} else {
atomic.StoreInt32(&clt.isValid, isValid)
}
return
}
func (clt *client) Write(b []byte) (n int, err error) {
n, err = clt.conn.Write(b)
if err != nil {
if !isTimeoutError(err) {
atomic.StoreInt32(&clt.isValid, isInvalid)
}
} else {
atomic.StoreInt32(&clt.isValid, isValid)
}
return
}
type timeoutErr interface {
Timeout() bool
}
func isTimeoutError(err error) bool {
timeoutErr, _ := err.(timeoutErr)
if timeoutErr == nil {
return false
}
return timeoutErr.Timeout()
}
func borrow() (clt *client, err error) {
var conn net.Conn
select {
case conn = <- pool:
default:
conn, err = net.Dial("tcp", "127.0.0.1:18849")
}
if err != nil {
return nil, err
}
clt = &client{
conn: conn,
inUse: &sync.WaitGroup{},
isValid: maybeValid,
}
return
}
func release(clt *client) error {
clt.conn.SetDeadline(time.Now().Add(-time.Second))
clt.inUse.Done()
clt.inUse.Wait()
if clt.isValid == isValid {
return clt.conn.Close()
}
select {
case pool <- clt.conn:
// returned to pool
return nil
default:
// pool is overflow
return clt.conn.Close()
}
}
func handle(server *net.TCPConn) {
defer server.Close()
clt, err := borrow()
if err != nil {
fmt.Print(err)
return
}
clt.inUse.Add(1)
defer release(clt)
go func() {
clt.inUse.Add(1)
defer server.Close()
defer release(clt)
buf := make([]byte, 2048)
io.CopyBuffer(server, clt, buf)
}()
buf := make([]byte, 2048)
io.CopyBuffer(clt, server, buf)
}









