result[j] = data[i] ^ stat.maskedData[j % 4];
}
} else {
result = data.slice(stat.index, data.length);
}
this.datas.push(result);
stat.length -= (data.length - stat.index);
//当长度为0,说明当前帧为最后帧
if (stat.length == 0) {
var buf = Buffer.concat(this.datas, stat.totalLength);
if (stat.opcode == 8) {
this.close(buf.toString());
} else {
this.emit("message", buf.toString());
}
this.reset();
}
};
完成了客户端发来的数据解析,还需要一个服务端发数据至客户端的方法,也就是按照上面所说的帧定义来组装数据并且发送出去。下面的代码中基本上每一行都有注释,应该还是比较容易理解的。
//数据发送
WebSocket.prototype.send = function (message) {
if(this.state !== "OPEN") return;
message = String(message);
var length = Buffer.byteLength(message);
// 数据的起始位置,如果数据长度16位也无法描述,则用64位,即8字节,如果16位能描述则用2字节,否则用第二个字节描述
var index = 2 + (length > 65535 ? 8 : (length > 125 ? 2 : 0));
// 定义buffer,长度为描述字节长度 + message长度
var buffer = new Buffer(index + length);
// 第一个字节,fin位为1,opcode为1
buffer[0] = 129;
// 因为是由服务端发至客户端,所以无需masked掩码
if (length > 65535) {
buffer[1] = 127;
// 长度超过65535的则由8个字节表示,因为4个字节能表达的长度为4294967295,已经完全够用,因此直接将前面4个字节置0
buffer.writeUInt32BE(0, 2);
buffer.writeUInt32BE(length, 6);
} else if (length > 125) {
buffer[1] = 126;
// 长度超过125的话就由2个字节表示
buffer.writeUInt16BE(length, 2);
} else {
buffer[1] = length;
}
// 写入正文
buffer.write(message, index);
this.socket.write(buffer);
};最后还要实现一个功能,就是心跳检测:防止服务端长时间不与客户端交互而导致客户端关闭连接,所以每隔十秒都会发送一次ping进行心跳检测
//每隔10秒进行一次心跳检测,若连续发出三次心跳却没收到响应则关闭socket
WebSocket.prototype.checkHeartBeat = function () {
var that = this;
setTimeout(function () {
if (that.state !== "OPEN") return;
if (that.pingTimes >= 3) {
that.close("time out");
return;
}
//记录心跳次数
that.pingTimes++;
that.sendPing();
that.checkHeartBeat();
}, 10000);
};
WebSocket.prototype.sendPing = function () {
this.socket.write(new Buffer(['0x89', '0x0']))
};
WebSocket.prototype.sendPong = function () {
this.socket.write(new Buffer(['0x8A', '0x0']))









