Linux I/O多路复用详解及实例

2019-10-13 18:20:34王旭

int fl = fcntl(pipe_fd, F_GETFL);
fcntl(pipe_fd, F_SETFL, fl | O_NONBLOCK);

如何让进程同时监视三个管道,其中一个有数据就继续执行而不会sleep,如果全部没有数据流入再sleep,就是多路复用技术需要解决的问题。

非阻塞I/O

非阻塞I/O就是当一个进程试图访问一个I/O设备的时候,无论是否从中获取了请求的数据都会返回并继续执行接下来的任务。,但非常适合请求是否成功对接下来的任务影响不大的I/O请求。但如果访问一个非阻塞I/O,但这个请求如果失败对进程接下来的任务有致命影响,最粗暴的就是使用while(1){read()}轮询。显然,这种方式会占用大量的CPU时间。

select机制

select是一种非常"古老"的同步I/O接口,但是提供了一种很好的I/O多路复用的思路

模型

fd_set   //创建fd_set对象,将来从中增减需要监视的fd
FD_ZERO()  //清空fd_set对象
FD_SET()  //将一个fd加入fd_set对象中 
select()  //监视fd_set对象中的文件描述符
pselect()  //先设定信号屏蔽,再监视
FD_ISSET() //测试fd是否属于fd_set对象
FD_CLR()  //从fd_set对象中删除fd

Note:

select的第一个参数nfds是指集合中的最大的文件描述符+1,因为select会无差别遍历整个文件描述符表直到找到目标,而文件描述符是从0开始的,所以一共是集合中的最大的文件描述符+1次。

上一条导致了这种机制的低效,如果需要监视的文件描述符是0和100那么每一次都会遍历101次

select()每次返回都会修改fd_set,如果要循环select(),需要先对初始的fd_set进行备

例子_I/O多路复用并发服务器

关于server本身的编程模型,参见tcp/ip协议服务器模型和udp/ip协议服务器模型这里仅是使用select实现伪并行的部分模型

#define BUFSIZE 100
#define MAXNFD 1024 

int main()
{
  /***********服务器的listenfd已经准本好了**************/
  fd_set readfds;
  fd_set writefds;
  FD_ZERO(&readfds);
  FD_ZERO(&writefds);
  FD_SET(listenfd, &readfds);

  fd_set temprfds = readfds;
  fd_set tempwfds = writefds;
  int maxfd = listenfd;


  int nready;
  char buf[MAXNFD][BUFSIZE] = {0};
  while(1){
    temprfds = readfds;
    tempwfds = writefds;

    nready = select(maxfd+1, &temprfds, &tempwfds, NULL, NULL)
    if(FD_ISSET(listenfd, &temprfds)){     
      //如果监听到的是listenfd就进行accept
      int sockfd = accept(listenfd, (struct sockaddr*)&clientaddr, &len);
      
      //将新accept的scokfd加入监听集合,并保持maxfd为最大fd
      FD_SET(sockfd, &readfds);
      maxfd = maxfd>sockfd?maxfd:sockfd;
      
      //如果意见检查了nready个fd,就没有必要再等了,直接下一个循环
      if(--nready==0)
        continue;
    }
    
    int fd = 0;
    //遍历文件描述符表,处理接收到的消息
    for(;fd<=maxfd; fd++){  
      if(fd == listenfd)
        continue;

      if(FD_ISSET(fd, &temprfds)){
        int ret = read(fd, buf[fd], sizeof buf[0]);
        if(0 == ret){  //客户端链接已经断开
          close(fd);
          FD_CLR(fd, &readfds);
          if(maxfd==fd) 
            --maxfd;
          continue;
        }
        //将fd加入监听可写的集合
        FD_SET(fd, &writefds); 
      }
      //找到了接收消息的socket的fd,接下来将其加入到监视写的fd_set中
      //将在下一次while()循环开始监视
      if(FD_ISSET(fd, &tempwfds)){
        int ret = write(fd, buf[fd], sizeof buf[0]);
        printf("ret %d: %dn", fd, ret);
        FD_CLR(fd, &writefds);
      }
    }
  }
  close(listenfd);
}