然而这还没完,我们的写入新值的操作不光是调用一个api创建socket就完了,还要有一系列的初始化操作,我们必须保证在初始化完成之前,其他通过Load拿到这个实例的协程无法真正访问socket实例。
这时候显然sync.Map自带的机制已经无法解决这个问题了,那么我们必须寻求其他的手段,要么锁,要么就sync.WaitGroup或者whatever的其他什么东西。
解决方案3: 闭包带来的神奇体验
后来经大佬指点,我在encoder.go中看到了这么一段代码:
func typeEncoder(t reflect.Type) encoderFunc {
if fi, ok := encoderCache.Load(t); ok {
return fi.(encoderFunc)
}
// To deal with recursive types, populate the map with an
// indirect func before we build it. This type waits on the
// real func (f) to be ready and then calls it. This indirect
// func is only used for recursive types.
var (
wg sync.WaitGroup
f encoderFunc
)
wg.Add(1)
fi, loaded := encoderCache.LoadOrStore(t, encoderFunc(func(e *encodeState, v reflect.Value, opts encOpts) {
wg.Wait()
f(e, v, opts)
}))
if loaded {
return fi.(encoderFunc)
}
// Compute the real encoder and replace the indirect func with it.
f = newTypeEncoder(t, true)
wg.Done()
encoderCache.Store(t, f)
return f
}
豁然开朗,我们可以在sync.Map中存放一个闭包函数,然后在闭包函数中等待本地的sync.WaitGroup完成再返回实例。于是最终的代码也就成型了。
struct SocketMutex{
sync.Mutex
socket *zmq4.Socket
}
struct SocketMap {
sockets sync.Map
}
func (pushList *SocketMap) push(ip string, data []byte) {
type SocketFunc func()*SocketMutex
var (
socket *SocketMutex
w sync.WaitGroup
)
socket = &SocketMutex {
socket : zmq4.NewSocket()
}
w.Add(1)
socketf, ok = pushList.sockets.LoadOrStore(ip, SocketFunc(func()*SocketMutex) {
w.Wait()
return socket
})
if !ok {
socket = &{
socket: zmq4.NewSocket()
}
//do some initial operation like connect
w.Done()
} else {
socket = socketInter.(*SockeFunc)()
}
socket.Lock()
defer socket.Unlock()
socket.socket.Send(data)
}
总结:
并发代码中的竞争问题,每一行代码的重入性都要深思熟虑啊。
总的来说要保持以下几个准则:
(1) 不可重入访问的系统资源,如socketfd, filefd,signalfd(事实上大多数这种系统资源都是不可重入的)等,在使用无锁结构的容器、读写锁封装的容器时,需要给每个资源单独加锁或者使用其他手段保证系统资源在临界区受到有效保护。
(2)如果有读取,如果为空则写入的逻辑,需要使用能提供原子性保证的LoadOrSave调用,或者没有的话,自己实现也要保证读取和写入过程整体的原子性;防止并发访问Load调用时,多个线程都返回否而创建多个实例,然后在Save的时候又互相覆盖。——这个原则不光对成员是系统资源的时候生效,如果存放的是其他东西也同样适用。









