Node.js中的流(Stream)介绍

2020-06-17 06:54:55易采站长站整理


process.stdin.on(‘readable’, function(){
   var buf = process.stdin.read();
   if(buf){
      var data = buf.toString();
      // parsing data …                                               
   }
});

这样调用:


head -10 some.txt | node parser.js

对于 Readable 流,我们还可以订阅它的 data 和 end 事件,以获取数据块并在流枯竭时获得通知,如 经典socket示例 中那样:

req.on(‘connect’, function(res, socket, head) {
    socket.on(‘data’, function(chunk) {
      console.log(chunk.toString());
    });
    socket.on(‘end’, function() {
      proxy.close();
    });
  });

Readable流状态的切换
需要注意的是,Readable 流有两种状态:flowing mode(激流) 和 pause  mode(暂停)。前者根本停不下来,谁被pipe上了就马上不停的给;后者会暂停,直到下游显式的调用 Stream.read() 请求才读取数据块。Readable 流初始化时是 pause mode的。

这两种状态可以互为切换的,其中,

有以下任一行为,pause 转 flowing:

1.对 Readable 流添加一个data事件订阅
2.对 Readable 调用 .resume() 显式开启flowing
3.调用 Readable 流的 .pipe(writable) ,桥接到一个 Writable 流上

有以下任一行为,flowing 转回 pause:

1.Readable 流还没有 pipe 到任何流上,可调 .pause() 暂停
2.Readable 流已经 pipe 到了流上,需 remove 掉所有 data 事件订阅,并且调用 .unpipe()方法逐一解除与下游流的关系

妙用

结合流的异步特性,我可以写出这样的应用:直接将 用户A 的输出桥接到 用户B 的页面上输出:

router.post(‘/post’, function(req, res) {
    var destination = req.headers[‘destination’]; //发给谁
    cache[destionation] = req;
    //是的,并不返回,所以最好是个ajax请求
});

用户B请求的时候:


router.get(‘/inbox’, function(req, res){
    var user = req.headers[‘user’];
    cache.find(user, function(err, previousReq){ //找到之前存的req
       var form = new multiparty.Form();
       form.parse(previousReq);  // 有文件给我