「生活可以更简单, 欢迎来到我的开源世界」
  1. 6.1 概述
  2. 6.2 I/O模型
    1. 6.2.1 阻塞式I/O阻塞
    2. 6.2.2 非阻塞式I/O模型
    3. 6.2.3 I/O复用模型
    4. 6.2.4 信号驱动式I/O模型
    5. 6.2.5 异步I/O模型
    6. 6.2.6 各种I/O模型的比较
  3. 6.3 select函数
    1. 6.3.1 描述符就绪条件
    2. 6.3.2 select的最大描述符数
  4. 6.4 str_cli函数(修订版)
  5. 6.5 批量输入
  6. 6.6 shutdown函数
  7. 6.7 str_cli函数(再修订版)
  8. 6.8 TCP回射服务器程序(修订版)
  9. 6.9 pselect函数
  10. 6.10 poll函数
  11. 6.11 TCP回射服务器程序(再修订版)
Unix网络编程-第6章 I/O复用:select和poll函数
2019-12-24
」 「

第6章 I/O复用:select和poll函数

6.1 概述

内核一旦发现进程指定的一个或多个I/O条件就绪(也就是说输入已准备好被读取,或者描述符已能承接更多的输出),它就通知进程,这个能力称为I/O复用。

I/O复用由select和poll两个函数支持,前者较新的称为pselect的POSIX变种。

I/O复用并非只限于网络编程,I/O复用典型使用在下列网络应用场合:

6.2 I/O模型

Unix下可用的I/O模型有五种:

一个输入操作通常包括两个不同的阶段:

  1. 等待数据准备好
  2. 从内核向进程复制数据

对于一个套接字上的输入操作:

  1. 第一步通常涉及等待数据从网络中到达,当所等待的分组到达时,它被复制到内核中的某个缓冲区
  2. 把数据从内核缓冲区复制到应用进程缓冲区

6.2.1 阻塞式I/O阻塞

最流行的I/O模型是阻塞式I/O模型。默认情况下,所有套接字都是阻塞的。

image-20200220142525476

6.2.2 非阻塞式I/O模型

进程把一个套接字设置为非阻塞是在通知内核:当所请求的I/O操作非把本进程投入睡眠才能完成时,不要把本进程投入睡眠,而是返回一个错误。

image-20200220143123892

**轮询(polling)**:应用进程持续轮询内核,以查看某个操作是否就绪,这样做往往耗费大量CPU时间。

6.2.3 I/O复用模型

通过调用select或poll,阻塞在这两个系统调用中的某一个之上,而不是阻塞在真正的I/O调用上

image-20200220211237813

I/O复用需要两个系统调用

优势:可以等待多个描述符就绪

6.2.4 信号驱动式I/O模型

使用信号,让内核在描述符就绪时发送SIGIO信号通知我们,这种模式为信号驱动式I/O。

image-20200220211730610

优势:等待数据报到达期间进程不被阻塞,主循环可以继续执行,只要等待来自信号处理函数的通知:既可以是数据已经准备好被处理,也可以是数据报已准备好被读取

6.2.5 异步I/O模型

异步I/O由POSIX规范定义。

异步函数的工作机制是:告知内核启动某个操作,并让内核在整个操作(包括将数据从内核复制到我们自己的缓冲区)完成后通知我们。

与信号驱动模型的主要区别在于:信号驱动式I/O是由内核通知我们何时可以启动一个I/O操作,而异步I/O模型是由内核通知我们I/O操作何时完成。

image-20200220222137004

POSIX异步函数以aio_或lio_开头。

这里调用aio_read函数,给内核传递描述符、缓冲区指针、缓冲区大小(与read相同的三个参数)和文件偏移(与lseek类似),并告诉内核当整个操作完成时如何通知我们。该系统调用立即返回,而且在等待I/O完成期间,进程不被阻塞。

6.2.6 各种I/O模型的比较

POSIX定义:

image-20200220222618152

前4种模型都是同步I/O模型,因为其中真正的I/O操作(recvfrom)将进程阻塞。只有异步I/O模型与POSIX定义的异步I/O相匹配。

6.3 select函数

该函数允许进程指示内核等待多个事件中的任何一个发送,并只在有一个或多个事件发生或经历一段指定的时间后才唤醒它。

select函数告知内核对哪些描述符(就读、写或异常条件)感兴趣以及等待多长时间。描述符不局限于套接字,任何描述符都可以使用select来测试。

#include <sys/select.h>
#include <sys/time.h>

//若有就绪描述符则返回其数目,若超时则返回0,若出错则返回-1
int select(int maxfdp1, fd_set *readset, fd_set *writeset,
fd_set *exceptset, const struct timeval *timeout);

参数解析:

  1. timeout:告知内核等待所指定描述符中的任何一个就绪可花多长时间

    struct timeval{
    long tv_sec; //秒数
    long tv_usec; //微秒数
    }

    该参数有三种情况:

    • 永远等待下去:仅在有一个描述符准备好I/O时才返回,此时将该参数设置为空指针
    • 等待一段固定时间:在有一个描述符准备好I/O时返回,但是不超过由该参数所指向的timeval结构中指定的秒数和微秒数
    • 根本不等待:检测描述符后立即返回,这称为轮询。该参数指向timeval结构,其中定时器的值必须为0

    前两种情况的等待通常会被进程在等待期间捕获的信号中断,并从信号处理函数返回。

    timeval结构允许我们指定一个微秒级的分辨率,然而内核支持的真实分辨率往往粗糙得多,许多Unix内核把超时值向上舍入成10ms的倍数,另外内核还需要额外的调度延迟。

    timeout参数的const限定词表示它在函数返回时不会被select修改,无法通过该参数计算出实际等待时间。

  2. 中间三个参数readset、writeset和exceptset指定我们要让内核测试读、写和异常条件的描述符

    • 目前支持的异常条件只有两个:

      1. 某个套接字的带外数据的到达
      2. 某个已置为分组模式的伪终端存在可从其主端读取的控制状态信息
    • select使用描述符集来给3个参数中的每一个参数指定一个或多个描述符值,通常是一个整数数组,其中每个整数中的每一位对应一个描述符。具体实现与应用程序无关,隐藏在数据类型fd_set和以下四个宏中:

      void FD_ZERO(fd_set *fdset);		//clear all bits of fdset
      void FD_SET(int fd, fd_set *fdset); //turn on the bit for fd in fdset
      void FD_CLR(int fd, fd_set *fdset); //turn off the bit for fd in fdset
      void FD_ISSET(int fd, fd_set *fdset);//is the bit for fd on fdset ?

      我们分配一个fd_set类型的描述符集,并用这些宏设置和测试集合中的每一位,也可以使用赋值语句将它赋值成另一个描述符集:

      fd_set rset;

      FD_ZERO(&rset); //初始化
      FD_SET(1, &rset);
      FD_SET(4, &rset);
      FD_SET(5, &rset);
    1. maxfdpl参数指定待测试的描述符的个数,它的值是待测试的最大描述符加1,描述符0,1,2……一直到maxfdpl-1均将被测试
    2. 头文件<sys/select.h>中定义的FD_SETSIZE常值是数据类型fd_set中描述符总数,通常是1024,通常使用不了那么多
    3. select函数修改由指针readset、writeset和exceptset所指向的描述符集,因此这三个参数都是值-结果传参。函数返回时,指示哪些描述符已就绪,描述符集内任何与未就绪描述符对应的位返回时均清零,因此每次重新调用select函数时,都需要重新为关心的位均置1。

6.3.1 描述符就绪条件

满足下列条件之一,则套接字准备好读:

  1. 该套接字的接收缓冲区中的数据字节数大于等于套接字接收缓冲区低水位标记SO_RCVLOWAT的当前大小,对该套接字读操作不阻塞并返回一个大于0的值
  2. 该连接的读半部关闭(也就是接收了FIN的TCP连接),读操作不阻塞并返回0
  3. 该套接字是一个监听套接字,且已完成的连接数不为0,不阻塞
  4. 其上有一个套接字错误待处理,不阻塞并返回-1,同时把设置errno为错误条件。此时可以使用getsockopt来读取和清除该错误

满足下列条件之一,则套接字准备好写:

  1. 该套接字的发送缓冲区中的可用空间字节数大于等于套接字发送缓冲区低水位标记的当前大小,并且该套接字已经连接或者不需要连接(如UDP套接字),对该套接字写操作不阻塞并返回一个大于0的值
  2. 该连接的写半部关闭,对这样的套接字的写操作将产生SIGPIPE信号
  3. 使用非阻塞式connect的套接字已建立连接,或者connect已经以失败告终
  4. 其上有一个套接字错误待处理,不阻塞并返回-1,同时把设置errno为错误条件

如果一个套接字存在带外数据或者仍处于带外标记,那么它有异常条件待处理。当某个套接字上发生错误时,它将由select标记为既可读又可写。

接收低水位标记和发送低水位标记的目的在于:允许应用进程控制在select返回可读或可写条件之前有多少数据可读或有多大空间可用于写。

举例:当数据少于64字节时,应用程序没有任何有效工作可做,则把接收低水位标记设置为64,以防少于64字节的数据准备好时select唤醒程序。

任何UDP套接字只要其发送低水位标记小于等于发送缓冲区大小(默认关系)就总是可写的,这是因为UDP套接字不需要连接。

image-20200805213132250

6.3.2 select的最大描述符数

最初设计select时,操作系统通常对每个进程可用的最大描述符数设置了上限,select就使用了相同的限制。

当今的Unix版本允许每个进程使用事实上无数目限制的描述符(往往仅受限于内存总量和管理性限制)。

表面上可以通过将FD_SETSIZE定义为某个更大的值,实际上却行不通,首先它是内核集成的,修改后需要重新编译内核,其次可能存在扩展性问题。

有些应用程序开始改用poll代替select,典型例子是需要复选大量描述符的事件驱动型服务器程序,所需描述符量超过1024个

6.4 str_cli函数(修订版)

原先版本可能阻塞于fgets调用,新版改为阻塞于select调用,或是等待标准输入可读,或是等待套接字可读。

image-20200805220011258

客户的套接字上的三个条件处理如下:

#include "unp.h"

void
str_cli(FILE *fp, int sockfd){
int maxfdpl;
fd_set rset;
char sendline[MAXLINE], recvline[MAXLINE];

FD_ZERO(&rset);
for( ; ; ){
FD_SET(fileno(fp), &rset);
FD_SET(sockfd, &rset);
//fileno函数把标准I/O文件指针转换为对应的描述符
maxfdpl = max(fileno(fp), sockfd) + 1;
Select(maxfdpl, &rset, NULL, NULL, NULL);

if(FD_ISSET(sockfd, &rset)){ //socket is readable
if(Readline(sockfd, recvline, MAXLINE) == 0)
err_quit("str_cli: server terminated prematurely");
Fputs(recvline, stdout);
}

if(FD_ISSET(fileno(fp), &rset)){ //input is readable
if(Fgets(sendline, MAXLINE, fp) == NULL)
return;
Writen(sockfd, sendline, strlen(sendline));
}
}
}

6.5 批量输入

当客户端使用停-等方式工作时,虽然对交互式使用是合适的,但是却不能实现对通信管道的高效利用。

如果把客户与服务器之间的网络作为全双工管道考虑,请求从客户想服务器发送,应答从服务器向客户发送,则停-等方式如下图:

image-20200806100510176

在Unix的shell环境下,很容易实现重定向标准输入和标准输出,从而可以批量运行客户。当我们把标准输入和标准输出重定向到文件来运行新的客户程序时,却发现输出文件总是小于输入文件(对于回射服务器而言理应相等)。

image-20200806101504809

当运行一个客户程序时,标准输入的EOF同时意味着完成从套接字的读入。批量运行客户程序时,客户程序写完请求时,并不能立即关闭连接,因为管道中还有其它的请求和应答,但是修订版的str_cli函数对标准输入EOF的处理却是返回到main函数,而main函数随后终止。

我们需要的是一种关闭TCP连接其中一半的方法,即给服务器发送一个FIN告诉它我们已经完成了数据发送,但仍然保持套接字描述符打开以便读取。由shutdown函数完成。

为了提升性能而引入的缓冲机制增加了网络应用程序的复杂性:

6.6 shutdown函数

终止网络连接的通常方法是调用close函数,不过close函数有两个限制,却可以使用shutdown来避免:

image-20200806105214830

#include <sys/socket.h>
//成功返回0,出错返回-1
int shutdown(int sockfd, int howto);

函数行为依赖于howto参数的值:

这三个SHUT_XXX名字由POSIX规范定义,howto参数的典型值将会是0(关闭读半部)、1(关闭写半部)和2(读半部和写半部都关闭)。

6.7 str_cli函数(再修订版)

改进(且正确)版本:

#include "unp.h"

void
str_cli(FILE *fp, int sockfd){
int maxfdpl, stdineof;
fd_set rset;
char buf[MAXLINE];
int n;

stdineof = 0;
FD_ZERO(&rset);
for( ; ; ){
if(stdineof == 0)
FD_SET(fileno(fp), &rset);
FD_SET(sockfd, &rset);
//fileno函数把标准I/O文件指针转换为对应的描述符
maxfdpl = max(fileno(fp), sockfd) + 1;
Select(maxfdpl, &rset, NULL, NULL, NULL);

if(FD_ISSET(sockfd, &rset)){ //socket is readable
if( (n = Read(sockfd, buf, MAXLINE)) == 0){
if(stdineof == 1)
return;
else
err_quit("str_cli: server terminated prematurely");
}
Write(fileno(stdout), buf, n)
}

if(FD_ISSET(fileno(fp), &rset)){ //input is readable
if( (n = Read(fileno(fp), buf, MAXLINE)) == 0){
stdineof = 1;
Shutdown(sockfd, SHUT_WR);
FD_CLR(fileno(fp), &rset);
continue;
}
Writen(sockfd, buf, n);
}
}
}

6.8 TCP回射服务器程序(修订版)

使用select来处理任意个客户的单进程程序,而不是为每个客户派生一个子进程。

#include "unp.h"

int
main(int argc, char **argv){
int i, maxi, maxfd, listenfd, connfd, sockfd;
//服务器所能处理的最大客户数目的限制是:
// min[FD_SETSIZE, 内核允许本进程打开的最大描述符数]
//进程能打开的描述符数数目上已经无限制,只受资源和内存限制
int nready, client[FD_SETSIZE];
ssize_t n;
fd_set rset, allset;
char buf[MAXLINE];
socklen_t clilen;
struct sockaddr_in cliaddr, servaddr;

listenfd = Socket(AF_INET, SOCK_STREAM, 0);

bzero(&servaddr, sizeof(servaddr));
servaddr.sin_family = AF_INET;
servaddr.sin_addr.s_addr = htonl(INADDR_ANY);
servaddr.sin_port = htons(SERV_PORT);

Bind(listenfd, (SA*)&servaddr, sizeof(servaddr));

Listen(listenfd, LISTENQ);

//描述符集前三位分别被设置为:标准输入、标准输出和标准错误输出
//select第一个参数为:maxfd+1
maxfd = listenfd;
maxi = -1;
for(i = 0; i < FD_SETSIZE; i++)
client[i] = -1;
FD_ZERO(&allset);
FD_SET(listenfd, &allset);

for( ; ; ){
rset = allset;
nready = Select(maxfd+1, &rset, NULL, NULL, NULL);

if(FD_ISSET(listenfd, &rset)){
clilen = sizeof(cliaddr);
connfd = Accept(listenfd, (SA*) &cliaddr, &clilen);

for(i = 0; i < FD_SETSIZE; i++){
if(client[i] < 0){
client[i] = connfd;
break;
}
}
if(i == FD_SETSIZE)
err_quit("too many clients");
FD_SET(connfd, &allset);
if(connfd > maxfd)
maxfd = connfd;
if(i > maxi)
maxi = i;
if(--nready <= 0)
continue;
}
for(i = 0; i <= maxi; i++){
if((sockfd = client[i]) < 0)
continue;
if(FD_ISSET(sockfd, &rset)){
if((n = Read(sockfd, buf, MAXLINE)) == 0){
Close(sockfd);
FD_CLR(sockfd, &allset);
client[i] = -1;
}else
Writen(sockfd, buf, n);
if(--nready <= 0)
break;
}
}
}
}

面向文本行服务器程序存在一个问题:如果一个恶意的客户连接到服务器,发送一个字节的数据(不是换行符)后进入睡眠,服务器将会调用readline从客户读入这个单字节的数据,然后阻塞于下一个read(readline内部的read)调用,等待客户其它数据。服务器因此阻塞而不能再为其它客户提供服务,直到那个恶意的客户发出一个换行符或终止为止。

当前版本的服务器程序已经弃用面向文本行的方法,等待换行输入或EOF而引起的拒绝服务攻击已经不复存在。

拒绝服务型攻击:当一个服务器在处理多个客户时,它绝对不能阻塞于只与单个客户相关的某个函数调用。否则可能导致服务器被挂起,拒绝为所有其它客户提供服务。

可能的解决办法:

  • 使用非阻塞式I/O
  • 让每个客户由单独的控制线程提供服务
  • 对I/O操作设置一个超时

6.9 pselect函数

pselect函数是由POSIX发明的,如今许多Unix变种支持它

#include <sys/select.h>
#include <signal.h>
#include <time.h>
//若有就绪描述符则返回其数目,超时返回0,出错返回-1
int pselect(int maxfdpl, fd_set *readset,
fd_set *writeset, fd_set *exceptset,
const struct timespec *timeout, const sigset_t *sigmark);

pselect相对于通常的select有两个变化:

6.10 poll函数

poll函数起源于SVR3,最初局限于流设备,SVR4取消了这种限制,允许poll工作在任何描述符上。

poll提供的功能与select类似,不过在处理流设备时,它能够提供额外的信息。

#include <poll.h>

struct pollfd{
int fd;
short events; //指定要测试的条件
short revents;//返回描述符的状态
}
//若有就绪描述符返回其数目,超时返回0,出错返回-1
int poll(struct pollfd *fdarray, unsigned long nfds, int timeout);

image-20210122225638690

图分为三个部分:

poll识别三类数据:普通(normal)、优先级带(priority band)、高优先级(high priority)

就TCP和UDP套接字而言,以下条件引起poll返回特定的revent。不幸的是,POSIX在其poll的实现中留了许多空洞(即有多种方法可返回相同的条件):

参数nfds指定结构数组中元素的个数;timeout参数指定poll函数返回前等待多长时间,它是一个指定应等待毫秒数的正值。

image-20200806153002725

INFTIM常值被定义为一个负值,如果系统不能提供毫秒级精度的定时器,该值就向上舍入到最接近的支持值。

两种方法用来(也许只是暂时的)关闭对单个文件描述符的检查,而不需要重新建立整个fds列表:

6.11 TCP回射服务器程序(再修订版)

#include "unp.h"
#include <limits.h> //for OPEN_MAX

int
main(int argc, char **argv){
int i, maxi, listenfd, connfd, sockfd;
int nready;
ssize_t n;
char buf[MAXLINE];
socklen_t clilen;
struct pollfd client[OPEN_MAX];
struct sockaddr_in cliaddr, servaddr;

listenfd = Socket(AF_INET, SOCK_STREAM, 0);

bzero(&servaddr, sizeof(servaddr));
servaddr.sin_family = AF_INET;
servaddr.sin_addr.s_addr = htonl(INADDR_ANY);
servaddr.sin_port = htons(SERV_PORT);

Bind(listenfd, (SA*)&servaddr, sizeof(servaddr));

Listen(listenfd, LISTENQ);

client[0].fd = listenfd;
client[1].events = POLLRDNORM;
for(i = 1; i < OPEN_MAX; i++)
client[i].fd = -1;
maxi = 0; //含义client数组当前正在使用的最大下标值

for( ; ; ){
nready = Poll(client, maxi + 1, INFTIM);

if(client[0].revents & POLLRDNORM){
clilen = sizeof(cliaddr);
connfd = Accept(listenfd, (SA*) &cliaddr, &clilen);

for(i = 1; i < OPEN_MAX; i++){
if(client[i].fd < 0){
client[i].fd = connfd;
break;
}
}
if(i == OPEN_MAX)
err_quit("too many clients");
client[i].events = POLLRDNORM;
if(i > maxfi)
maxfi = i;
if(i > maxi)
maxi = i;
if(--nready <= 0)
continue;
}
for(i = 1; i <= maxi; i++){
if((sockfd = client[i].fd) < 0)
continue;
if(client[i].revents & (POLLRDNORM | POLLERR)){
if((n = Read(sockfd, buf, MAXLINE)) < 0){
if(errno == ECONNRESET){
Close(sockfd);
client[i].fd = -1;
}else
err_SYS("read error");
}else if(n == 0){
Close(sockfd);
client[i].fd = -1;
}else
Writen(sockfd, buf, n);
if(--nready <= 0)
break;
}
}
}
}
<⇧>