Go 实现百万WebSocket连接的方法示例

2020-01-28 13:45:43丽君

现在我们的 netpoll 代码如下:


// 处理 goroutine 池中的轮询事件。
pool := gopool.New(128)

poller.Start(conn, netpoll.EventRead, func() {
  // 我们在所有 worker 被占用时阻塞 poller
  pool.Schedule(func() {
    Receive(ch)
  })
})

现在我们不仅在套接字中有可读数据时读取,而且还在第一次机会获取池中的空闲 goroutine。??

同样,我们修改 Send()


// 复用 writing goroutine
pool := gopool.New(128)

func (ch *Channel) Send(p Packet) {
  if c.noWriterYet() {
    pool.Schedule(ch.writer)
  }
  ch.send <- p
}

取代 go ch.writer() ,我们想写一个复用的 goroutines。因此,对于拥有 N 个 goroutines 的池,我们可以保证同时处理 N 个请求并且在 N + 1 的时候, 我们不会分配 N + 1 个缓冲区。 goroutine 池还允许我们限制新连接的 Accept()Upgrade() ,并避免大多数的 DDoS 攻击。

3.4 upgrade 零拷贝

如前所述,客户端使用 HTTP Upgrade 切换到 WebSocket 协议。这就是 WebSocket 协议的样子:


## HTTP Upgrade 示例

GET /ws HTTP/1.1
Host: mail.ru
Connection: Upgrade
Sec-Websocket-Key: A3xNe7sEB9HixkmBhVrYaA==
Sec-Websocket-Version: 13
Upgrade: websocket

HTTP/1.1 101 Switching Protocols
Connection: Upgrade
Sec-Websocket-Accept: ksu0wXWG+YmkVx+KQR2agP0cQn4=
Upgrade: websocket

也就是说,在我们的例子中,需要 HTTP 请求及其 Header 用于切换到 WebSocket 协议。这些知识以及 http.Request 中存储的内容 表明,为了优化,我们需要在处理 HTTP 请求时放弃不必要的内存分配和内存复制,并弃用 net/http 库。

例如, http.Request 有一个与 Header 具有相同名称的字段 ,这个字段用于将数据从连接中复制出来填充请求头。想象一下,该字段需要消耗多少额外内存,例如碰到比较大的 Cookie 头。

WebSocket 的实现

不幸的是,在我们优化的时候所有存在的库都是使用标准的 net/http 库进行升级。而且,(两个)库都不能使用上述的读写优化方案。为了采用这些优化方案,我们需要用一个比较低级的 API 来处理 WebSocket。要重用缓冲区,我们需要把协议函数变成这样:


func ReadFrame(io.Reader) (Frame, error)
func WriteFrame(io.Writer, Frame) error

如果有一个这种 API 的库,我们可以按下面的方式从连接中读取数据包(数据包的写入也一样):


// 预期的 WebSocket 实现API
// getReadBuf, putReadBuf 用来复用 *bufio.Reader (with sync.Pool for example).
func getReadBuf(io.Reader) *bufio.Reader
func putReadBuf(*bufio.Reader)

// 当 conn 中的数据可读取时,readPacket 被调用
func readPacket(conn io.Reader) error {
  buf := getReadBuf()
  defer putReadBuf(buf)

  buf.Reset(conn)
  frame, _ := ReadFrame(buf)
  parsePacket(frame.Payload)
  //...
}