go语言同步教程之条件变量

2020-01-28 13:14:39刘景俊

其中的Handshake函数代码通过使用条件变量的方式来处理新连接握手调用的同步问题:


func (c *Conn) Handshake() error {
 c.handshakeMutex.Lock()
 defer c.handshakeMutex.Unlock()

 for {
  if err := c.handshakeErr; err != nil {
   return err
  }
  if c.handshakeComplete {
   return nil
  }
  if c.handshakeCond == nil {
   break
  }

  c.handshakeCond.Wait()
 }

 c.handshakeCond = sync.NewCond(&c.handshakeMutex)
 c.handshakeMutex.Unlock()

 c.in.Lock()
 defer c.in.Unlock()

 c.handshakeMutex.Lock()

 if c.handshakeErr != nil || c.handshakeComplete {
  panic("handshake should not have been able to complete after handshakeCond was set")
 }

 if c.isClient {
  c.handshakeErr = c.clientHandshake()
 } else {
  c.handshakeErr = c.serverHandshake()
 }
 if c.handshakeErr == nil {
  c.handshakes++
 } else {
  c.flush()
 }

 if c.handshakeErr == nil && !c.handshakeComplete {
  panic("handshake should have had a result.")
 }

 c.handshakeCond.Broadcast()
 c.handshakeCond = nil

 return c.hand

我们也可以再通过一个例子熟悉sync.Cond的使用:

我们尝试实现一个读写同步的例子,需求是:我们有数个读取器和数个写入器,读取器必须依赖写入器对缓存区进行数据写入后,才可从缓存区中对数据进行读出。我们思考下,要实现类似的功能,除了使用channel,还能如何做?

写入器每次完成写入数据后,它都需要某种通知机制广播给处于阻塞状态的读取器,告诉它们可以对数据进行访问,这其实跟sync.Cond 的 广播机制是不是很像? 有了这个广播机制,我们可以通过sync.Cond来实现这个例子了:


package main

import (
 "bytes"
 "fmt"
 "io"
 "sync"
 "time"
)

type MyDataBucket struct {
 br  *bytes.Buffer
 gmutex *sync.RWMutex
 rcond *sync.Cond //读操作需要用到的条件变量
}

func NewDataBucket() *MyDataBucket {
 buf := make([]byte, 0)
 db := &MyDataBucket{
  br:  bytes.NewBuffer(buf),
  gmutex: new(sync.RWMutex),
 }
 db.rcond = sync.NewCond(db.gmutex.RLocker())
 return db
}

func (db *MyDataBucket) Read(i int) {
 db.gmutex.RLock()
 defer db.gmutex.RUnlock()
 var data []byte
 var d byte
 var err error
 for {
  //读取一个字节
  if d, err = db.br.ReadByte(); err != nil {
   if err == io.EOF {
    if string(data) != "" {
     fmt.Printf("reader-%d: %sn", i, data)
    }
    db.rcond.Wait()
    data = data[:0]
    continue
   }
  }
  data = append(data, d)
 }
}

func (db *MyDataBucket) Put(d []byte) (int, error) {
 db.gmutex.Lock()
 defer db.gmutex.Unlock()
 //写入一个数据块
 n, err := db.br.Write(d)
 db.rcond.Broadcast()
 return n, err
}

func main() {
 db := NewDataBucket()

 go db.Read(1)

 go db.Read(2)

 for i := 0; i < 10; i++ {
  go func(i int) {
   d := fmt.Sprintf("data-%d", i)
   db.Put([]byte(d))
  }(i)
  time.Sleep(100 * time.Millisecond)
 }
}

当使用sync.Cond的时候有两点移动要注意的:

一定要在调用cond.Wait方法前,锁定与之关联的读写锁 一定不要忘记在cond.Wait后,若数据已经处理完毕,在返回前要对与之关联的读写锁进行解锁。