详解Golang中select的使用与源码分析

2023-01-02 08:57:27
目录
背景select 流程

背景

golang>channel 通信。单个 channel 的通信可以通过一个goroutinechannel 发数据,另外一个从channel取数据进行。这是阻塞的,因为要想顺利执行完这个步骤,需要 channel 准备好才行,准备好的条件如下:

1.发送

    缓存有空间(如果是有缓存的 channel)有等待接收的 goroutine

    2.接收

      缓存有数据(如果是有缓存的 channel)有等待发送的 goroutine

      channel实际使用中还有如下两个需求,这个时候就需要select了。

        同时监听多个channel在没有channel准备好的时候,也可以往下执行。

        select>

        1.空select。作用是阻塞当前goroutine。不要用for{}来阻塞goroutine,因为会占用cpu。而select{}不会,因为当前goroutine不会再被调度。

         if len(cases) == 0 {
                 block()
         }

        2.配置好poll的顺序。由于是同时监听多个channel的发送或者接收,所以需要按照一定的顺序查看哪个channel准备好了。如果每次采用select中的顺序查看channel是否准备好了,那么只要在前面的channel准备好的足够快,那么会造成后面的channel即使准备好了,也永远不会被执行。打乱顺序的逻辑如下,采用了洗牌算法\color{red}{洗牌算法}洗牌算法,注意此过程中会过滤掉channel为nil的case。\color{red}{注意此过程中会过滤掉 channel 为 nil 的 case。}注意此过程中会过滤掉channel为nil的case。

         // generate permuted order
         norder := 0
         for i := range scases {
                 cas := &scases[i]
        
                 // Omit cases without channels from the poll and lock orders.
                 if cas.c == nil {
                         cas.elem = nil // allow GC
                         continue
                 }
        
                 j := fastrandn(uint32(norder + 1))
                 pollorder[norder] = pollorder[j]
                 pollorder[j] = uint16(i)
                 norder++
         }

        3.配置好lock的顺序。由于可能会修改channel中的数据,所以在打算往channel中发送数据或者从channel接收数据的时候,需要锁住 channel。而一个channel可能被多个select监听,如果两个select对两个channel A和B,分别按照顺序A, B和B,A上锁,是可能会造成死锁的,导致两个select都执行不下去。

        所以select中锁住channel的顺序至关重要,解决方案是按照channel的地址的顺序锁住channel。因为在两个selectchannel有交集的时候,都是按照交集中channel的地址顺序锁channel

        实际排序代码如下,采用堆排序算法\color{red}{堆排序算法}堆排序算法按照channel的地址从小到大对channel进行排序。

         // sort the cases by Hchan address to get the locking order.
         // simple heap sort, to guarantee n log n time and constant stack footprint.
         for i := range lockorder {
                 j := i
                 // Start with the pollorder to permute cases on the same channel.
                 c := scases[pollorder[i]].c
                 for j > 0 && scases[lockorder[(j-1)/2]].c.sortkey() < c.sortkey() {
                         k := (j - 1) / 2
                         lockorder[j] = lockorder[k]
                         j = k
                 }
                 lockorder[j] = pollorder[i]
         }
         for i := len(lockorder) - 1; i >= 0; i-- {
                 o := lockorder[i]
                 c := scases[o].c
                 lockorder[i] = lockorder[0]
                 j := 0
                 for {
                         k := j*2 + 1
                         if k >= i {
                                 break
                         }
                         if k+1 < i && scases[lockorder[k]].c.sortkey() < scases[lockorder[k+1]].c.sortkey() {
                                 k++
                         }
                         if c.sortkey() < scases[lockorder[k]].c.sortkey() {
                                 lockorder[j] = lockorder[k]
                                 j = k
                                 continue
                         }
                         break
                 }
                 lockorder[j] = o
         }

        4.锁住select中的所有channel。要查看channel中的数据了。

         // lock all the channels involved in the select
         sellock(scases, lockorder)
        

        5.第一轮查看是否已有准备好的channel。如果有直接发送数据到channel或者从channel接收数据。注意selectchannel切片中,前面部分是从channel接收数据的case,后半部分是往channel发送数据的case。

        按照pollorder顺序查看是否有channel准备好了。

         for _, casei := range pollorder {
                 casi = int(casei)
                 cas = &scases[casi]
                 c = cas.c
                 if casi >= nsends {
                         sg = c.sendq.dequeue()
                         if sg != nil {
                                 goto recv
                         }
                         if c.qcount > 0 {
                                 goto bufrecv
                         }
                         if c.closed != 0 {
                                 goto rclose
                         }
                 } else {
                         if raceenabled {
                                 racereadpc(c.raceaddr(), casePC(casi), chansendpc)
                         }
                         if c.closed != 0 {
                                 goto sclose
                         }
                         sg = c.recvq.dequeue()
                         if sg != nil {
                                 goto send
                         }
                         if c.qcount < c.dataqsiz {
                                 goto bufsend
                         }
                 }
         }

        6.直接执行default分支

         if !block {
                 selunlock(scases, lockorder)
                 casi = -1
                 goto retc
         }

        7.第二轮遍历channel。创建sudog把当前goroutine放到每个channel的等待列表中去,等待channel准备好时被唤醒。

         // pass 2 - enqueue on all chans
         gp = getg()
         if gp.waiting != nil {
                 throw("gp.waiting != nil")
         }
         nextp = &gp.waiting
         for _, casei := range lockorder {
                 casi = int(casei)
                 cas = &scases[casi]
                 c = cas.c
                 sg := acquireSudog()
                 sg.g = gp
                 sg.isSelect = true
                 // No stack splits between assigning elem and enqueuing
                 // sg on gp.waiting where copystack can find it.
                 sg.elem = cas.elem
                 sg.releasetime = 0
                 if t0 != 0 {
                         sg.releasetime = -1
                 }
                 sg.c = c
                 // Construct waiting list in lock order.
                 *nextp = sg
                 nextp = &sg.waitlink
        
                 if casi < nsends {
                         c.sendq.enqueue(sg)
                 } else {
                         c.recvq.enqueue(sg)
                 }
         }

        8.等待被唤醒。其中gopark的时候会释放对所有channel占用的锁。

         // wait for someone to wake us up
         gp.param = nil
         // Signal to anyone trying to shrink our stack that we're about
         // to park on a channel. The window between when this G's status
         // changes and when we set gp.activeStackChans is not safe for
         // stack shrinking.
         atomic.Store8(&gp.parkingOnChan, 1)
         gopark(selparkcommit, nil, waitReasonSelect, traceEvGoBlockSelect, 1)
         gp.activeStackChans = false
        

        9.被唤醒

          锁住所有channel清理当前goroutine的等待sudog找到是被哪个channel唤醒的,并清理每个channel上当前的goroutine对应的sudog
           sellock(scases, lockorder)
          
           gp.selectDone = 0
           sg = (*sudog)(gp.param)
           gp.param = nil
          
           // pass 3 - dequeue from unsuccessful chans
           // otherwise they stack up on quiet channels
           // record the successful case, if any.
           // We singly-linked up the SudoGs in lock order.
           casi = -1
           cas = nil
           caseSuccess = false
           sglist = gp.waiting
           // Clear all elem before unlinking from gp.waiting.
           for sg1 := gp.waiting; sg1 != nil; sg1 = sg1.waitlink {
                   sg1.isSelect = false
                   sg1.elem = nil
                   sg1.c = nil
           }
           gp.waiting = nil
          
           for _, casei := range lockorder {
                   k = &scases[casei]
                   if sg == sglist {
                           // sg has already been dequeued by the G that woke us up.
                           casi = int(casei)
                           cas = k
                           caseSuccess = sglist.success
                           if sglist.releasetime > 0 {
                                   caseReleaseTime = sglist.releasetime
                           }
                   } else {
                           c = k.c
                           if int(casei) < nsends {
                                   c.sendq.dequeueSudoG(sglist)
                           } else {
                                   c.recvq.dequeueSudoG(sglist)
                           }
                   }
                   sgnext = sglist.waitlink
                   sglist.waitlink = nil
                   releaseSudog(sglist)
                   sglist = sgnext
           }

          到此这篇关于详解Golang中select的使用与源码分析的文章就介绍到这了,更多相关Golang select内容请搜索易采站长站以前的文章或继续浏览下面的相关文章希望大家以后多多支持易采站长站!