epoll原理(Epoll的实现原理分析)

一、接口用法

epoll作为linux下多路复用的一种机制,不仅接口简洁易用,而且性能高效,先简单回顾下3个epoll相关的系统调用:

int epoll_create(int size);

int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);

int epoll_wait(int epfd, struct epoll_event *events,int maxevents, int timeout);

  • epoll_create建立一个epoll fd,参数size不再使用。
  • epoll_ctl是事件的注册函数,可以操作上面建立的epoll fd,例如,将刚建立的socket fd加入到epoll中让其监控,或者把 epoll正在监控的某个socket fd移出epoll,不再监控它等等。
  • epoll_wait等待事件的产生,类似于select()调用。参数events用来从内核得到事件的集合,maxevents表示这个events有多大;timeout是超时时间(毫秒,0会立即返回,小于0时将是永久阻塞)。该函数返回需要处理的事件数目,如返回0表示已超时。

二、为什么高效

1、仅从上面的调用方式就可以看出epoll比select/poll的一个优势:select/poll每次调用都要传递所要监控的所有fd给select/poll系统调用(这意味着每次调用都要将fd列表从用户态拷贝到内核态,当fd数目很多时,这会造成低效)。而每次调用epoll_wait时(作用相当于调用select/poll),不需要再传递fd列表给内核,因为已经在epoll_ctl中将需要监控的fd告诉了内核(epoll_ctl不需要每次都拷贝所有的fd,只需要进行增量式操作)。所以,在调用epoll_create之后,内核已经在内核态开始准备数据结构存放要监控的fd了。每次epoll_ctl只是对这个数据结构进行简单的维护。

2、此外,内核使用了slab机制,为epoll提供了快速的数据结构:在内核里,一切皆文件,所以,epoll向内核注册了一个文件系统,用于存储上述的被监控的fd。当你调用epoll_create时,就会在这个虚拟的epoll文件系统里创建一个file结点。当然这个file不是普通文件,它只服务于epoll。epoll在被内核初始化时(操作系统启动),同时会开辟出epoll自己的内核高速cache区,用于安置每一个我们想监控的fd,这些fd会以红黑树的形式保存在内核cache里,以支持快速的查找、插入、删除。这个内核高速cache区,就是建立连续的物理内存页,然后在之上建立slab层,简单的说,就是物理上分配好你想要的size的内存对象,每次使用时都是使用空闲的已分配好的对象。

3、epoll的第三个优势在于:当我们调用epoll_ctl往里塞入百万个fd时,epoll_wait仍然可以飞快地返回,并有效的将发生事件的fd给我们用户。这是由于我们在调用epoll_create时,内核除了帮我们在epoll文件系统里建了个file结点,在内核cache里建了个红黑树用于存储以后epoll_ctl传来的fd外,还会再建立一个list链表,用于存储准备就绪的事件,当epoll_wait调用时,仅仅观察这个list链表里有没有数据即可。有数据就返回,没有数据就sleep,等到timeout时间到后即使链表没数据也返回。所以,epoll_wait非常高效。而且,通常情况下即使我们要监控百万计的fd,大多一次也只返回很少量的准备就绪fd而已,所以,epoll_wait仅需要从内核态copy少量的fd到用户态而已。那么,这个准备就绪list链表是怎么维护的呢?当我们执行epoll_ctl时,除了把fd放到epoll文件系统里file对象对应的红黑树上之外,还会给内核中断处理程序注册一个回调函数,告诉内核,如果这个fd的中断到了,就把它放到准备就绪list链表里。所以,当一个fd(例如socket)上有数据到了,内核在把设备(例如网卡)上的数据copy到内核中后就来把fd(socket)插入到准备就绪list链表里了。

如此,一颗红黑树,一张准备就绪fd链表,少量的内核cache,就帮我们解决了大并发下的fd(socket)处理问题。

1.执行epoll_create时,创建了红黑树和就绪list链表。

2.执行epoll_ctl时,如果增加fd(socket),则检查在红黑树中是否存在,存在立即返回,不存在则添加到红黑树上,然后向内核注册回调函数,用于当中断事件来临时向准备就绪list链表中插入数据。

3.执行epoll_wait时立刻返回准备就绪链表里的数据即可。

三、内部结构及实现

1、创建一个epoll fd(epfd)对应的主要数据结构为:

struct eventpoll {
    spin_lock_t        lock;               //对本数据结构的访问
    struct mutex      mtx;               //防止使用时被删除
    wait_queue_head_t   wq;         /sys_epoll_wait() 使用的等待队列
    wait_queue_head_t   poll_wait;       //file->poll()使用的等待队列
    struct list_head    rdllist;        //事件满足条件的链表
    struct rb_root      rbr;            //用于管理所有fd的红黑树(树根)
    struct epitem      *ovflist;      //将事件到达的fd进行链接起来发送至用户空间
}

struct eventpoll在epoll_create时创建:

long sys_epoll_create(int size) {
    struct eventpoll *ep;
    ... ...
    ep_alloc(&ep); //为ep分配内存并进行初始化
    /* 调用anon_inode_getfd 新建一个file instance,也就是epoll可以看成一个文件(匿名文件)。
    因此我们可以看到epoll_create会返回一个fd。epoll所管理的所有的fd都是放在一个大的结构eventpoll(红黑树)中,
   将主结构体struct eventpoll *ep放入file->private项中进行保存(sys_epoll_ctl会取用)*/
   fd = anon_inode_getfd("[eventpoll]", &eventpoll_fops, ep, O_RDWR | (flags & O_CLOEXEC));
    return fd;
}

2、上面的原型是epoll的fd所维护的主结构,当向系统中添加一个fd时,就创建一个epitem结构体,并插入至红黑树中。epitem结构体如下:

struct epitem {
    struct rb_node  rbn;          //用于主结构管理的红黑树
    struct list_head  rdllink;     //事件就绪队列
    struct epitem  *next;         //用于主结构体中的链表
    struct epoll_filefd  ffd;      //这个结构体对应的被监听的文件描述符信息
    int  nwait;                          //poll操作中事件的个数
    struct list_head  pwqlist;   //双向链表,保存着被监视文件的等待队列,功能类似于select/poll中的poll_table
    struct eventpoll  *ep;        //该项属于哪个主结构体(多个epitm从属于一个eventpoll)
    struct list_head  fllink;       //双向链表,用来链接被监视的文件描述符对应的struct file。
                                              //因为file里有f_ep_link,用来保存所有监视这个文件的epoll节点
    struct epoll_event  event;  //注册的感兴趣的事件,也就是用户空间的epoll_event
}

epoll用kmem_cache_create(slab分配器)分配内存用来存放struct epitem和struct eppoll_entry, 这个内存在初始化的时候就分配好了。

static int __init eventpoll_init(void) {
   mutex_init(&pmutex);
   ep_poll_safewake_init(&psw);
   epi_cache = kmem_cache_create("eventpoll_epi", sizeof(struct epitem), 0,
                        SLAB_HWCACHE_ALIGN|EPI_SLAB_DEBUG|SLAB_PANIC, NULL);
   pwq_cache = kmem_cache_create("eventpoll_pwq", sizeof(struct eppoll_entry), 0,
                          EPI_SLAB_DEBUG|SLAB_PANIC, NULL);
   return 0;
}

有了epitem, 剩下的就是插入到epoll管理的红黑树中了,也就是接口epoll_ctl做的事情:

 asmlinkage long sys_epoll_ctl(int epfd,int op,int fd,struct epoll_event __user *event) {
    struct file *file,*tfile;
    struct eventpoll *ep;
    struct epoll_event epds;
    int error = -FAULT;

    //判断参数的合法性,将 __user *event 复制给 epds。
    if(ep_op_has_event(op) && copy_from_user(&epds,event,sizeof(struct epoll_event)))
            goto error_return; //省略跳转到的代码
    
    tfile = fget(fd);    // fd 对应的文件对象
    if(!tfile->f_op || !tfile->f_op->poll)   //不能没有poll驱动
        goto error_tgt_fput;
     
   file  = fget (epfd);  // epoll fd 对应的文件对象
    if(file == tfile || !is_file_poll(file))  //防止自己监听自己
        goto error_tgt_fput;
   
    //在create时存入进去的(anon_inode_getfd),现在取用。
    ep = file->private->data;
    mutex_lock(&ep->mtx);
   
    //防止重复添加(在ep的红黑树中查找是否已经存在这个fd)
    epi = epi_find(ep,tfile,fd);
    switch(op) {
       ... ...
        case EPOLL_CTL_ADD:  //增加监听一个fd
            if(!epi) {
                epds.events |= EPOLLERR | POLLHUP;     //默认包含POLLERR和POLLHUP事件
                 //在ep的红黑树中插入这个fd对应的epitm结构体。
                error = ep_insert(ep, &epds, tfile, fd); 
            } else  //重复添加(在ep的红黑树中查找已经存在这个fd)。
                error = -EEXIST;
            break;
        ... ...
    }
    return error;
}

3、ep_insert的实现如下, 只留主要逻辑:

static int ep_insert(struct eventpoll *ep, struct epoll_event *event, struct file *tfile, int fd){
   struct epitem *epi;
   //分配一个epitem结构体来保存每个加入的fd
   if(!(epi = kmem_cache_alloc(epi_cache, GFP_KERNEL)))
      goto error_return;

   //初始化该结构体
   ep_rb_initnode(&epi->rbn);
   ep_set_ffd(&epi->ffd,tfile,fd);
   epi->event = *event;
  ... ...
  
  //安装poll回调函数
   init_poll_funcptr(&epq.pt,  ep_ptable_queue_proc );   

   /* 调用poll函数来获取当前事件位,其实是利用它来调用注册函数ep_ptable_queue_proc
   (poll_wait中调用)。如果fd是套接字,f_op为socket_file_ops,poll函数是sock_poll()。
   如果是TCP套接字的话,进而会调用到tcp_poll()函数。此处调用poll函数查看当前文件描述符
   的状态,存储在revents中。在poll的处理函数(tcp_poll())中,会调用sock_poll_wait(),
   在sock_poll_wait()中会调用到epq.pt.qproc指向的函数,也就是ep_ptable_queue_proc()。 */ 
   revents = tfile->f_op->poll(tfile, &epq.pt);

   list_add_tail(&epi->fllink,&tfile->f_ep_lilnks);
   ep_rbtree_insert(ep,epi); //将该epi插入到ep的红黑树中
  
  /*  如果要监视的文件状态已经就绪并且还没有加入到就绪队列中,则将当前的epitem
  加入到就绪队列中.如果有进程正在等待该文件的状态就绪,则唤醒一个等待的进程。 */ 
  ... ...
   return 0;
}

执行f_op->poll(tfile, &epq.pt)时,XXX_poll(tfile, &epq.pt)函数会执行poll_wait(),poll_wait()会调用epq.pt.qproc函数,即ep_ptable_queue_proc。

4、在ep_ptable_queue_proc函数中,引入了另外一个非常重要的数据结构eppoll_entry。eppoll_entry主要完成epitem和epitem事件发生时的callback(ep_poll_callback)函数之间的关联。首先将eppoll_entry的whead指向fd的设备等待队列(同select中的wait_address),然后初始化eppoll_entry的base变量指向epitem,最后通过add_wait_queue将epoll_entry挂载到fd的设备等待队列上。完成这个动作后,epoll_entry已经被挂载到fd的设备等待队列。

/*  在文件操作中的poll函数中调用,将epoll的回调函数加入到目标文件的唤醒队列中。
    如果监视的文件是套接字,参数whead则是sock结构的sk_sleep成员的地址。  */
static void ep_ptable_queue_proc(struct file *file, wait_queue_head_t *whead, poll_table *pt) {
    /* struct ep_queue {
         poll_table pt;  struct epitem *epi;
      } */
    struct epitem *epi = ep_item_from_epqueue(pt); //pt获取struct ep_queue的epi字段。
  
    /*struct eppoll_entry {
         struct list_head llink;
         struct epitem *base;
         wait_queue_t wait;
         wait_queue_head_t *whead;
   }; */
    struct eppoll_entry *pwq;
    if (epi->nwait >= 0 && (pwq = kmem_cache_alloc(pwq_cache, GFP_KERNEL))) {
        init_waitqueue_func_entry(&pwq->wait, ep_poll_callback);
        pwq->whead = whead;
        pwq->base = epi;
        add_wait_queue(whead, &pwq->wait);
        list_add_tail(&pwq->llink, &epi->pwqlist);
        epi->nwait++;
    } else {
        /*如果分配内存失败,则将nwait置为-1,表示
         发生错误,即内存分配失败,或者已发生错误*/
        epi->nwait = -1;
    }
}

5、ep_poll_callback函数主要的功能是将被监视文件的等待事件就绪时,将文件对应的epitem实例添加到就绪队列中,当用户调用epoll_wait()时,内核会将就绪队列中的事件报告给用户。由ep_ptable_queue_proc函数设置了等待队列的ep_poll_callback回调函数。所以在设备硬件数据到来时,硬件中断处理函数中会唤醒该等待队列上等待的进程时,会调用唤醒函数ep_poll_callback

static int ep_poll_callback(wait_queue_t *wait, unsigned mode, int sync, void *key) {
   struct epitem *epi = ep_item_from_wait(wait);
   struct eventpoll *ep = epi->ep;
  
    ... ... // 一堆检查后
    //***关键***,将该fd加入到epoll监听的就绪链表中
   list_add_tail(&epi->rdllink, &ep->rdllist);

   //唤醒调用epoll_wait()函数时睡眠的进程,用户层epoll_wait 超时前返回。
   if (waitqueue_active(&ep->wq))
      __wake_up_locked(&ep->wq, TASK_UNINTERRUPTIBLE | TASK_INTERRUPTIBLE);
   if (waitqueue_active(&ep->poll_wait))
      pwake++;
  
out_unlock: 
  spin_unlock_irqrestore(&ep->lock, flags);
   if (pwake)
      ep_poll_safewake(&psw, &ep->poll_wait);
   return 1;
}

6、所以ep_poll_callback函数主要的功能是将被监视文件的等待事件就绪时,将文件对应的epitem实例添加到就绪队列中,当用户调用epoll_wait()时,内核会将就绪队列中的事件报告给用户。epoll_wait实现如下:

SYSCALL_DEFINE4(epoll_wait, int, epfd, struct epoll_event __user *, events, 
                              int, maxevents, int, timeout)  {
   struct file *file;
   struct eventpoll *ep;
    /* 检查用户空间传入的events指向的内存是否可写。参见__range_not_ok()。 */
   if (!access_ok(VERIFY_WRITE, events, maxevents * sizeof(struct epoll_event))) {
      error = -EFAULT;  goto error_return;
   }

    /* 获取epfd对应的eventpoll文件的file实例,file结构是在epoll_create中创建。 */
   file = fget(epfd);
    /* 通过检查epfd对应的文件操作是不是eventpoll_fops 来判断epfd是否是一个eventpoll文件 */
   if (!is_file_epoll(file))
      goto error_fput;

    /* At this point it is safe to assume that the "private_data" contains  */
   ep = file->private_data;
    /* Time to fish for events ... */
   error = ep_poll(ep, events, maxevents, timeout);
   return error;
}

epoll_wait调用ep_poll,ep_poll主要是调用ep_send_events向用户空间发送就绪事件。

五,总结

最后以一张关系图作为结束吧:

主要逻辑图

内容底部广告位(手机)
标签:

管理员
草根站长管理员

专注网站优化+网络营销,只做有思想的高价值网站,只提供有担当的营销服务!

上一篇:驺怎么读(一个形容植物和动物长势良好的字,也是诗经中反映召公仁德的字)
下一篇:好物点评团(淘宝怎么做好物点评团?)