本文共 26681 字,大约阅读时间需要 88 分钟。
epoll模型
在select/poll出现之前我们只能通过read/write的IO操作来从流中读取数据,当然在少量IO操作的时候完全是可靠的,但是当IO操作快速增长时甚至到了大规模并发阶段,这样的IO就显得捉襟见肘了。
首先我们来理解一个内核缓冲区的概念,假设A,B两个分别作为写入方与读出方,假设一开始内核缓冲区是空的,B作为读出方,被阻塞着。然后首先A往管道写入,这时候内核缓冲区由空的状态变到非空状态,内核就会产生一个事件告诉B该醒来了,这个事件姑且称之为“缓冲区非空”。
但是“缓冲区非空”事件通知B后,B却还没有读出数据;且内核许诺了不能把写入管道中的数据丢掉这个时候,A写入的数据会滞留在内核缓冲区中,如果内核也缓冲区满了,B仍未开始读数据,最终内核缓冲区会被填满,这个时候会产生一个I/O事件,告诉进程A,你该等等(阻塞)了,我们把这个事件定义为“缓冲区满”。
假设后来B终于开始读数据了,于是内核的缓冲区空了出来,这时候内核会告诉A,内核缓冲区有空位了,你可以从长眠中醒来了,继续写数据了,我们把这个事件叫做“缓冲区非满”
也许事件Y1已经通知了A,但是A也没有数据写入了,而B继续读出数据,知道内核缓冲区空了。这个时候内核就告诉B,你需要阻塞了!,我们把这个时间定为“缓冲区空”。
然后我们来说说阻塞I/O的缺点。但是阻塞I/O模式下,一个线程只能处理一个流的I/O事件。如果想要同时处理多个流,要么多进程(fork),要么多线程(pthread_create),很不幸这两种方法效率都不高。
于是再来考虑非阻塞忙轮询的I/O方式,我们发现我们可以同时处理多个流了,我们只要不停的把所有流从头到尾问一遍,又从头开始。这样就可以处理多个流了,但这样的做法显然不好,因为如果所有的流都没有数据,那么只会白白浪费CPU。
为了避免非阻塞轮询时CPU的空转,引入了select,select会将当前线程阻塞,当有一个或多个IO事件发生时,当前线程就被唤醒,但这时我们仍需遍历一遍所有的流以确定哪些流的事件到达了。也就是说使用select会带来O(n)时间复杂度的轮询时间,epoll可理解为event poll,即epoll将有IO事件发生的流放入一个链表中,并一次性返回到用户空间,把轮询复杂度复杂度降低到了O(1)。
系统调用
int epoll_create(int size);int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);int epoll_wait(int epfd, struct epoll_event *events,int maxevents, int timeout); 1 2 3 1 2 3
关于epoll提供的API这里就简单介绍一下:
- epoll_create用于创建一个句柄,自从linux2.6.8之后,size参数是被忽略的。需要注意的是,当创建好epoll句柄后,它就是会占用一个fd值,所以在使用完epoll后,必须调用close()关闭,否则可能导致fd被耗尽。
- epoll_ctl可以操作上面建立的epoll fd,例如,将刚建立的socket fd加入到epoll中让其监控,或者把 epoll正在监控的某个socket fd移出epoll,不再监控它等等。
- epoll_wait在调用时,在给定的timeout时间内,当在监控的这些文件描述符中的某些文件描述符上有事件发生时,就返回用户态的进程。
深入内部
数据组织
epoll向内核注册了一个文件系统,用于存储上述的被监控的fd。当你调用epoll_create时,就会在这个虚拟的epoll文件系统里创建一个file结点。即epoll用了更适合小内存数据块I/O的slab分配器,这也大大提升了epoll的性能。
epoll有两个个重要的,一个用于存储外来fd的红黑树,用于安置每一个我们想监控的fd,这些fd会以红黑树的形式保存在内核cache里,以支持快速的查找、插入、删除。此外epoll还会再建立一个就绪链表,用于存储准备就绪的事件,当epoll_wait调用时,仅仅观察这个list链表里有没有数据即可。
有数据就返回,没有数据就sleep,等到timeout时间到后即使链表没数据也返回。所以,epoll_wait非常高效。而且,通常情况下即使我们要监控百万计的fd,大多一次也只返回很少量的准备就绪fd而已,所以,epoll_wait仅需要从内核态copy少量的fd到用户态而已。
实现原理
首先我们先来看一下epoll源代码功能的实现流程图:
接下来我们讨论epoll解决高并发的细节问题:
首先看一下epoll源码内部两个主要的数据结构,第一个是struct eventpoll,每创建一个epollfd, 内核就会分配一个eventpoll与之对应。
struct eventpoll { spinlock_t lock; /* 自旋锁*/ struct mutex mtx; /* 互斥锁*/ wait_queue_head_t wq; /* 调用epoll_wait()时的等待队列*/ wait_queue_head_t poll_wait; /* file->poll()使用的等待队列 */ struct list_head rdllist; /* 所有已经ready的epitem都在这个链表里面 */ struct rb_root rbr; /* 所有要监听的epitem都在这里 */ struct epitem *ovflist; /*这是一个单链表链接着所有的struct epitem当event转移到用户空间时*/ struct user_struct *user; /* 这里保存了一些用户变量, 比如fd监听数量的最大值等等 */}; 1 2 3 4 5 6 7 8 9 10 1 2 3 4 5 6 7 8 9 10
第二个是struct epitem结构,当向系统中添加一个fd时,就创建一个epitem结构体,这是内核管理epoll的基本数据结构。
struct epitem { struct rb_node rbn; /* 用于主结构管理的红黑树 */ struct list_head rdllink; /* 链表节点, 所有已经ready的epitem都会被链到eventpoll的rdllist中 */ struct epitem *next; /* 用于主结构体中的链表 */ struct epoll_filefd ffd; /* 这个结构体对应的被监听的文件描述符信息 */ int nwait; /* poll操作中事件的个数 */ struct list_head pwqlist; /* 双向链表,保存着被监视文件的等待队列,功能类似于select/poll中的poll_table */ struct eventpoll *ep; /* 当前epitem属于哪个eventpoll */ struct list_head fllink; /* 双向链表,用来链接被监视的文件描述符对应的struct file。因为file里有f_ep_link,用来保存所有监视这个文件的epoll节点 */ struct epoll_event event; /* 当前的epitem关系哪些events, 这个数据是调用epoll_ctl时从用户态传递过来 */}; 1 2 3 4 5 6 7 8 9 10 11 1 2 3 4 5 6 7 8 9 10 11
1.执行epoll_create时,创建了红黑树和就绪list链表,struct eventpoll在调用epoll_create时被创建
/* 你没看错, 这就是epoll_create()的真身, 基本啥也不干直接调用epoll_create1了, * 另外你也可以发现, size这个参数其实是没有任何用处的... */SYSCALL_DEFINE1(epoll_create, int, size){ if (size <= 0) return -EINVAL; return sys_epoll_create1(0);}/* 这里是真正的sys_epoll_create*/SYSCALL_DEFINE1(epoll_create1, int, flags){ int error; struct eventpoll *ep = NULL; /* 主描述符 */ ... error = ep_alloc(&ep); /* ep_alloc(struct eventpoll **pep)为pep分配内存,并初始化 */ ... /* * Creates all the items needed to setup an eventpoll file. That is, * a file structure and a free file descriptor. */ /* 这里是创建一个匿名fd, 说起来就话长了...长话短说: * epollfd本身并不存在一个真正的文件与之对应, 所以内核需要创建一个 * "虚拟"的文件, 并为之分配真正的struct file结构, 而且有真正的fd. * 因此我们可以看到epoll_create会返回一个fd. * 这里2个参数比较关键: * eventpoll_fops, fops就是file operations, 就是当你对这个文件(这里是虚拟的)进行操作(比如读)时, * fops里面的函数指针指向真正的操作实现, 类似C++里面虚函数和子类的概念. * epoll只实现了poll和release(就是close)操作, 其它文件系统操作都有VFS全权处理了. * ep, ep就是struct epollevent, 它会作为一个私有数据保存在struct file的private指针里面. * 其实说白了, 就是为了能通过fd找到struct file, 通过struct file能找到eventpoll结构. */ error = anon_inode_getfd("[eventpoll]", &eventpoll_fops, ep, O_RDWR | (flags & O_CLOEXEC)); ...} 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40
2.执行epoll_ctl时,如果增加fd(socket),则检查在红黑树中是否存在,存在立即返回,不存在则添加到红黑树上,然后向内核注册回调函数,用于当中断事件来临时向准备就绪list链表中插入数据。
SYSCALL_DEFINE4(epoll_ctl, int, epfd, int, op, int, fd, struct epoll_event __user *, event){ int error; struct file *file, *tfile; struct eventpoll *ep; struct epitem *epi; struct epoll_event epds; error = -EFAULT; /* * 错误处理以及从用户空间将epoll_event结构copy到内核空间. */ if (ep_op_has_event(op) && copy_from_user(&epds, event, sizeof(struct epoll_event))) goto error_return; /* 取得struct file结构, epfd既然是真正的fd, 那么内核空间 * 就会有与之对于的一个struct file结构 * 这个结构在epoll_create1()中, 由函数anon_inode_getfd()分配 */ file = fget(epfd); /* 我们需要监听的fd, 它当然也有个struct file结构, 上下2个不要搞混了哦 */ tfile = fget(fd); ep = file->private_data; /* 取到我们的eventpoll结构, 来自与epoll_create1()中的分配 */ mutex_lock(&ep->mtx); /* 接下来的操作有可能修改数据结构内容, 加锁 */ /* 对于每一个监听的fd, 内核都有分配一个epitem结构, * 而且我们也知道, epoll是不允许重复添加fd的, * 所以我们首先查找该fd是不是已经存在了. * ep_find()其实就是RBTREE查找, 跟C++STL的map差不多一回事, O(lgn)的时间复杂度. */ epi = ep_find(ep, tfile, fd); switch (op) { /* 首先我们关心添加 */ case EPOLL_CTL_ADD: if (!epi) { /* 之前的find没有找到有效的epitem, 证明是第一次插入, 接受! * 这里我们可以知道, POLLERR和POLLHUP事件内核总是会关心的 * */ epds.events |= POLLERR | POLLHUP; error = ep_insert(ep, &epds, tfile, fd); /* rbtree插入 */ } else /* 重复添加 */ error = -EEXIST; break; /* 下面删除和修改操作就不再介绍 */ case EPOLL_CTL_DEL: if (epi) error = ep_remove(ep, epi); else error = -ENOENT; break; case EPOLL_CTL_MOD: if (epi) { epds.events |= POLLERR | POLLHUP; error = ep_modify(ep, epi, &epds); } else error = -ENOENT; break; } mutex_unlock(&ep->mtx); /* 释放前面的互斥锁 */} 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64
ep_insert的实现如下:
/* * ep_insert()在epoll_ctl()中被调用, 完成往epollfd里面添加一个监听fd的工作 * tfile是fd在内核态的struct file结构 */static int ep_insert(struct eventpoll *ep, struct epoll_event *event, struct file *tfile, int fd){ int error, revents, pwake = 0; unsigned long flags; struct epitem *epi; struct ep_pqueue epq; /* struct ep_queue{ poll_table pt; struct epitem *epi; } */ /* 查看是否达到当前用户的最大监听数 */ if (unlikely(atomic_read(&ep->user->epoll_watches) >= max_user_watches)) return -ENOSPC; /* 从著名的slab中分配一个epitem */ if (!(epi = kmem_cache_alloc(epi_cache, GFP_KERNEL))) return -ENOMEM; /* 这些都是相关成员的初始化 */ INIT_LIST_HEAD(&epi->rdllink); INIT_LIST_HEAD(&epi->fllink); INIT_LIST_HEAD(&epi->pwqlist); epi->ep = ep; ep_set_ffd(&epi->ffd, tfile, fd); /* 这里保存了我们需要监听的文件fd和它的file结构 */ epi->event = *event; epi->nwait = 0; epi->next = EP_UNACTIVE_PTR; epq.epi = epi; /* 初始化一个poll_table * 其实就是指定调用poll_wait(注意不是epoll_wait!!!)时的回调函数,和我们关心哪些events, * ep_ptable_queue_proc()就是我们的回调啦, 初值是所有event都关心 */ init_poll_funcptr(&epq.pt, ep_ptable_queue_proc); /* 这一部很关键, 也比较难懂, 完全是内核的poll机制导致的... * 首先, f_op->poll()一般来说只是个wrapper, 它会调用真正的poll实现, * 拿UDP的socket来举例, 这里就是这样的调用流程: f_op->poll(), sock_poll(), * udp_poll(), datagram_poll(), sock_poll_wait(), 最后调用到我们上面指定的 * ep_ptable_queue_proc()这个回调函数...(好深的调用路径...). * 完成这一步, 我们的epitem就跟这个socket关联起来了, 当它有状态变化时, * 会通过ep_poll_callback()来通知. * 最后, 这个函数还会查询当前的fd是不是已经有啥event已经ready了, 有的话 * 会将event返回. */ revents = tfile->f_op->poll(tfile, &epq.pt); /* 这个就是每个文件会将所有监听自己的epitem链起来 */ spin_lock(&tfile->f_lock); list_add_tail(&epi->fllink, &tfile->f_ep_links); spin_unlock(&tfile->f_lock); /* 都搞定后, 将epitem插入到对应的eventpoll中去 */ ep_rbtree_insert(ep, epi); spin_lock_irqsave(&ep->lock, flags); /* 到达这里后, 如果我们监听的fd已经有事件发生, 那就要处理一下 */ if ((revents & event->events) && !ep_is_linked(&epi->rdllink)) { list_add_tail(&epi->rdllink, &ep->rdllist); /* 将当前的epitem加入到ready list中去 */ if (waitqueue_active(&ep->wq)) /* 谁在epoll_wait, 就唤醒它... */ wake_up_locked(&ep->wq); if (waitqueue_active(&ep->poll_wait)) /* 谁在epoll当前的epollfd, 也唤醒它... */ pwake++; } spin_unlock_irqrestore(&ep->lock, flags); atomic_inc(&ep->user->epoll_watches); if (pwake) ep_poll_safewake(&ep->poll_wait); /* 唤醒等待eventpoll文件状态就绪的进程 */ return 0; ...} 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81
执行f_op->poll(tfile, &epq.pt)时,XXX_poll(tfile, &epq.pt)函数会执行poll_wait(),poll_wait()会调用epq.pt.qproc函数,即ep_ptable_queue_proc。
ep_ptable_queue_proc函数如下:
/* * 该函数在调用f_op->poll()时会被调用. * 也就是epoll主动poll某个fd时, 用来将epitem与指定的fd关联起来的. * 关联的办法就是使用等待队列(waitqueue) */static void ep_ptable_queue_proc(struct file *file, wait_queue_head_t *whead, poll_table *pt){ struct epitem *epi = ep_item_from_epqueue(pt); struct eppoll_entry *pwq; /* struct eppoll_entry { struct list_head llink; struct epitem *base; wait_queue_t wait; wait_queue_head_t *whead; }; */ if (epi->nwait >= 0 && (pwq = kmem_cache_alloc(pwq_cache, GFP_KERNEL))) { /* 初始化等待队列, 指定ep_poll_callback为唤醒时的回调函数, * 当我们监听的fd发生状态改变时, 也就是队列头被唤醒时, * 指定的回调函数将会被调用. */ init_waitqueue_func_entry(&pwq->wait, ep_poll_callback); pwq->whead = whead; pwq->base = epi; add_wait_queue(whead, &pwq->wait); /* 将刚分配的等待队列成员加入到头中, 头是由fd持有的 */ list_add_tail(&pwq->llink, &epi->pwqlist); epi->nwait++; /* nwait记录了当前epitem加入到了多少个等待队列中,最大为1 */ } else { epi->nwait = -1; }} 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33
ep_ptable_queue_proc 函数完成 epitem 加入到特定文件的wait队列任务。
ep_ptable_queue_proc有三个参数:
struct file *file; //该fd对应的文件对象wait_queue_head_t *whead; //该fd对应的设备等待队列(同select中的mydev->wait_address)poll_table *pt; //f_op->poll(tfile, &epq.pt)中的epq.pt 1 2 3 1 2 3
在ep_ptable_queue_proc函数中,引入了另外一个非常重要的数据结构eppoll_entry。eppoll_entry主要完成epitem和epitem事件发生时的callback(ep_poll_callback)函数之间的关联。首先将eppoll_entry的whead指向fd的设备等待队列(同select中的wait_address),然后初始化eppoll_entry的base变量指向epitem,最后通过add_wait_queue将epoll_entry挂载到fd的设备等待队列上。完成这个动作后,epoll_entry已经被挂载到fd的设备等待队列。
其中struct eppoll_entry定义如下:
struct eppoll_entry { struct list_head llink; struct epitem *base; wait_queue_t wait; wait_queue_head_t *whead;}; 1 2 3 4 5 6 1 2 3 4 5 6
由于ep_ptable_queue_proc函数设置了等待队列的ep_poll_callback回调函数。所以在设备硬件数据到来时,硬件中断处理函数中会唤醒该等待队列上等待的进程时,会调用唤醒函数ep_poll_callback
ep_poll_callback函数主要的功能是将被监视文件的等待事件就绪时,将文件对应的epitem实例添加到就绪队列中,当用户调用epoll_wait()时,内核会将就绪队列中的事件报告给用户。
/* * 这个是关键性的回调函数, 当我们监听的fd发生状态改变时, 它会被调用. * 参数key被当作一个unsigned long整数使用, 携带的是events. */static int ep_poll_callback(wait_queue_t *wait, unsigned mode, int sync, void *key){ int pwake = 0; unsigned long flags; struct epitem *epi = ep_item_from_wait(wait); /* 从等待队列获取epitem.需要知道哪个进程挂载到这个设备 */ struct eventpoll *ep = epi->ep; spin_lock_irqsave(&ep->lock, flags); /* * 判断注册的感兴趣事件 * #define EP_PRIVATE_BITS (EPOLLONESHOT | EPOLLET) * 有非EPOLLONESHONT或EPOLLET事件 */ if (!(epi->event.events & ~EP_PRIVATE_BITS)) goto out_unlock; /* 没有我们关心的event... */ if (key && !((unsigned long) key & epi->event.events)) goto out_unlock; /* * 这里看起来可能有点费解, 其实干的事情比较简单: * 如果该callback被调用的同时, epoll_wait()已经返回了, * 也就是说, 此刻应用程序有可能已经在循环获取events, * 这种情况下, 内核将此刻发生event的epitem用一个单独的链表 * 链起来, 不发给应用程序, 也不丢弃, 而是在下一次epoll_wait * 时返回给用户. */ if (unlikely(ep->ovflist != EP_UNACTIVE_PTR)) { if (epi->next == EP_UNACTIVE_PTR) { epi->next = ep->ovflist; ep->ovflist = epi; } goto out_unlock; } if (!ep_is_linked(&epi->rdllink)) /* 将当前的epitem放入ready list *///1.先把epitem放入ready list list_add_tail(&epi->rdllink, &ep->rdllist); if (waitqueue_active(&ep->wq)) /* 唤醒epoll_wait... *///2.再将处于epoll_wait睡眠中的进程唤醒 wake_up_locked(&ep->wq); if (waitqueue_active(&ep->poll_wait)) /* 如果epollfd也在被poll, 那就唤醒队列里面的所有成员. */ pwake++;out_unlock: spin_unlock_irqrestore(&ep->lock, flags); if (pwake) ep_poll_safewake(&ep->poll_wait); return 1;} 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55
ep_poll_callback函数主要的功能是将被监视文件的等待事件就绪时,将文件对应的epitem实例添加到就绪队列中,当用户调用epoll_wait()时,内核会将就绪队列中的事件报告给用户。
3.执行epoll_wait时立刻返回准备就绪链表里的数据即可。
SYSCALL_DEFINE4(epoll_wait, int, epfd, struct epoll_event __user *, events, int, maxevents, int, timeout){ int error; struct file *file; struct eventpoll *ep; /* The maximum number of event must be greater than zero */ if (maxevents <= 0 || maxevents > EP_MAX_EVENTS) return -EINVAL; /* 这个地方有必要说明一下: * 内核对应用程序采取的策略是"绝对不信任", * 所以内核跟应用程序之间的数据交互大都是copy, 不允许(也时候也是不能...)指针引用. * epoll_wait()需要内核返回数据给用户空间, 内存由用户程序提供, * 所以内核会用一些手段来验证这一段内存空间是不是有效的. */ if (!access_ok(VERIFY_WRITE, events, maxevents * sizeof(struct epoll_event))) { error = -EFAULT; goto error_return; } file = fget(epfd); /* 获取epollfd的struct file, epollfd也是文件嘛 */ if (!file) goto error_return; if (!is_file_epoll(file)) /* 检查一下它是不是一个真正的epollfd... */ goto error_fput; ep = file->private_data; /* 获取eventpoll结构 */ error = ep_poll(ep, events, maxevents, timeout); /* 睡眠, 等待事件到来 */error_fput: fput(file);error_return: return error;} 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36
epoll_wait调用ep_poll,ep_poll实现如下:
/* 这个函数真正将执行epoll_wait的进程带入睡眠状态... */static int ep_poll(struct eventpoll *ep, struct epoll_event __user *events, int maxevents, long timeout){ int res, eavail; unsigned long flags; long jtimeout; wait_queue_t wait;//等待队列 /* timeout是以毫秒为单位,这里是要转换为jiffies时间。这里加上999(即1000-1),是为了向上取整。 */ jtimeout = (timeout < 0 || timeout >= EP_MAX_MSTIMEO) ? MAX_SCHEDULE_TIMEOUT : (timeout * HZ + 999) / 1000;retry: spin_lock_irqsave(&ep->lock, flags);//加锁,自旋锁用处:也许是因为要访问和内核相关的,或者是唯一的资源:就绪队列等待队列 res = 0; if (list_empty(&ep->rdllist)) { /* 没有事件,所以需要睡眠。当有事件到来时,睡眠会被ep_poll_callback函数唤醒。*/ init_waitqueue_entry(&wait, current); /* 将current进程放在wait这个等待队列中 */ __add_wait_queue_exclusive(&ep->wq, &wait); /* 挂载到ep结构的等待队列 */ for (;;) { set_current_state(TASK_INTERRUPTIBLE); /* 执行ep_poll_callback()唤醒时应当需要将当前进程唤醒,所以当前进程状态应该为“可唤醒”TASK_INTERRUPTIBLE */ if (!list_empty(&ep->rdllist) || !jtimeout) /* 如果就绪队列不为空,也就是说已经有文件的状态为就绪状态,另外一种情况则是进程超时,这两种情况都会退出循环。*/ break; /* 如果有信号产生, 退出循环 */ if (signal_pending(current)) { res = -EINTR; break; } /* 啥事都没有,解锁, 睡觉... */ spin_unlock_irqrestore(&ep->lock, flags); /* * 主动让出处理器,等待ep_poll_callback()将当前进程唤醒或者超时, * 返回值是剩余的时间。从这里开始当前进程会进入睡眠状态, * 直到某些文件的状态就绪或者超时。当文件状态就绪时, * eventpoll的回调函数ep_poll_callback()会唤醒在ep->wq指向的等待队列中的进程。 */ jtimeout = schedule_timeout(jtimeout); spin_lock_irqsave(&ep->lock, flags);//应该为加锁自旋锁 } __remove_wait_queue(&ep->wq, &wait);//这里注意,只有当进程真正从循环中脱离后才会将当前进程从ep->wq指向的等待队列中删除。 set_current_state(TASK_RUNNING);//设置为进程状态为TASK_RUNNING(大意应该是非可唤醒状态) } /* * ep->ovflist链表存储的向用户传递事件时暂存就绪的文件。 * 所以不管是就绪队列ep->rdllist不为空,或者ep->ovflist不等于 * EP_UNACTIVE_PTR,都有可能现在已经有文件的状态就绪。 * ep->ovflist不等于EP_UNACTIVE_PTR有两种情况,一种是NULL,此时 * 可能正在向用户传递事件,不一定就有文件状态就绪, * 一种情况时不为NULL,此时可以肯定有文件状态就绪, * 参见ep_send_events()。 */ eavail = !list_empty(&ep->rdllist) || ep->ovflist != EP_UNACTIVE_PTR; spin_unlock_irqrestore(&ep->lock, flags); /* 如果没有被信号中断,并且有事件就绪,但是没有获取到事件(有可能被其他进程获取到了),并且没有超时,则跳转到retry标签处,重新等待文件状态就绪。如果一切正常, 有event发生, 就开始准备数据copy给用户空间了... */ if (!res && eavail && !(res = ep_send_events(ep, events, maxevents)) && jtimeout) goto retry; return res;} 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66
ep_send_events函数向用户空间发送就绪事件。
ep_send_events()函数将用户传入的内存简单封装到ep_send_events_data结构中,然后调用ep_scan_ready_list() 将就绪队列中的事件传入用户空间的内存。
用户空间访问这个结果,进行处理。
static int ep_send_events(struct eventpoll *ep, struct epoll_event __user *events, int maxevents){ struct ep_send_events_data esed; esed.maxevents = maxevents; esed.events = events; return ep_scan_ready_list(ep, ep_send_events_proc, &esed);} 1 2 3 4 5 6 7 8 1 2 3 4 5 6 7 8
下面是ep_scan_ready_list函数:
static int ep_scan_ready_list(struct eventpoll *ep, int (*sproc)(struct eventpoll *, struct list_head *, void *), void *priv){ int error, pwake = 0; unsigned long flags; struct epitem *epi, *nepi; LIST_HEAD(txlist); mutex_lock(&ep->mtx); spin_lock_irqsave(&ep->lock, flags); /* 这一步要注意, 首先, 所有监听到events的epitem都链到rdllist上了, * 但是这一步之后, 所有的epitem都转移到了txlist上, 而rdllist被清空了。 */ list_splice_init(&ep->rdllist, &txlist); /* 此时此刻我们不希望有新的event加入到ready list中了, 保存后下次再处理 */ ep->ovflist = NULL; spin_unlock_irqrestore(&ep->lock, flags); /* * 在这个回调函数里面处理每个epitem * sproc 就是 ep_send_events_proc */ error = (*sproc)(ep, &txlist, priv); spin_lock_irqsave(&ep->lock, flags); /* 现在我们来处理ovflist, 这些epitem都是我们在传递数据给用户空间时 * 监听到了事件. */ for (nepi = ep->ovflist; (epi = nepi) != NULL; nepi = epi->next, epi->next = EP_UNACTIVE_PTR) { /* 将这些直接放入readylist */ if (!ep_is_linked(&epi->rdllink)) list_add_tail(&epi->rdllink, &ep->rdllist); } ep->ovflist = EP_UNACTIVE_PTR; list_splice(&txlist, &ep->rdllist); /* 上一次没有处理完的epitem, 重新插入到ready list */ if (!list_empty(&ep->rdllist)) { /* ready list不为空, 直接唤醒 */ if (waitqueue_active(&ep->wq)) wake_up_locked(&ep->wq); if (waitqueue_active(&ep->poll_wait)) pwake++; } spin_unlock_irqrestore(&ep->lock, flags); mutex_unlock(&ep->mtx); if (pwake) ep_poll_safewake(&ep->poll_wait); return error;} 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56
ep_send_events_proc函数作为callback在ep_scan_ready_list()中被调用,其中head是一个链表, 包含了已经ready的epitem, 这个不是eventpoll里面的ready list, 而是上面函数中的txlist。
static int ep_send_events_proc(struct eventpoll *ep, struct list_head *head, void *priv){ struct ep_send_events_data *esed = priv; int eventcnt; unsigned int revents; struct epitem *epi; struct epoll_event __user *uevent; /* 扫描整个链表 */ for (eventcnt = 0, uevent = esed->events; !list_empty(head) && eventcnt < esed->maxevents;) { epi = list_first_entry(head, struct epitem, rdllink); /* 取出第一个成员 */ list_del_init(&epi->rdllink); /* 然后从链表里面移除 */ /* * 读取events, * 注意events我们ep_poll_callback()里面已经取过一次了, 为啥还要再取? * 1. 我们当然希望能拿到此刻的最新数据, events是会变的 * 2. 不是所有的poll实现, 都通过等待队列传递了events, 有可能某些驱动压根没传 * 必须主动去读取. */ revents = epi->ffd.file->f_op->poll(epi->ffd.file, NULL) & epi->event.events; if (revents) { /* 将当前的事件和用户传入的数据都copy给用户空间, * 就是epoll_wait()后应用程序能读到的那一堆数据. */ if (__put_user(revents, &uevent->events) || __put_user(epi->event.data, &uevent->data)) { list_add(&epi->rdllink, head); return eventcnt ? eventcnt : -EFAULT; } eventcnt++; uevent++; if (epi->event.events & EPOLLONESHOT) epi->event.events &= EP_PRIVATE_BITS; else if (!(epi->event.events & EPOLLET)) { /* 注意!EPOLLET和非ET的区别就在这一步之差 * 如果是ET, epitem是不会再进入到readly list, * 除非fd再次发生了状态改变, ep_poll_callback被调用. * 如果是非ET, 不管你还有没有有效的事件或者数据, * 都会被重新插入到ready list, 再下一次epoll_wait * 时, 会立即返回, 并通知给用户空间. 当然如果这个 * 被监听的fds确实没事件也没数据了, epoll_wait会返回一个0, * 空转一次. */ list_add_tail(&epi->rdllink, &ep->rdllist); } } } return eventcnt;} 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55
总结
epoll_create从slab缓存中创建一个eventpoll对象,初始化eventpoll对象时创建了红黑树和就绪list链表并初始化了等待队列,并且创建一个匿名的fd跟fd对应的file对象, 而eventpoll对象保存在struct file结构的private指针中,该fd对应的file operations只是实现了poll跟release操作(其他操作交由VFS来实现)
epoll_ctl首先将epoll_event结构拷贝到内核空间中,并且判断加入的fd是否支持poll结构(epoll,poll,selectI/O多路复用必须支持poll操作),并且从epfd->file->privatedata获取event_poll对象,根据op区分是添加删除还是修改,如果增加fd(socket)则执行插入操作,插入操作时,会创建一个与fd对应的epitem结构,并且初始化相关成员,重要的是指定了调用poll_wait时的回调函数用于数据就绪时唤醒进程,(其内部,初始化设备的等待队列,将该进程注册到等待队列)完成这一步,epitem就跟这个socket关联起来了, 当它有状态变化时, 会通过ep_poll_callback()来通知,最后调用加入的fd的file operation->poll函数(最后会调用poll_wait操作)用于完成注册操作.最后检查在红黑树中是否存在,存在立即返回,不存在则添加到红黑树上,然后向内核注册回调函数,用于当中断事件来临时向准备就绪list链表中插入数据。
epoll_wait(这部分可以看ep_poll以及它后面的)
1.首先先把自己锁住(也许是因为要访问和内核相关的,或者是唯一的资源:就绪队列等待队列)
2.然后换算睡眠时间(timeout是以毫秒为单位,这里是要转换为jiffies时间)
3.判断事件列表是否为空
若不为空:直接转到处理就绪事件的代码 ,跳过循环
若为空:先将当前进程放到wait的等待队列中然后在挂载到ep结构的等待队列中,随后进入循环,
1.将当前进程状态设为可唤醒TASK_INTERRUPTIBLE
2.判断eventopll对象的链表(就绪队列)是否为空,若不为空则说明有文件(fd都为文件描述符)的状态为就绪状态或这进程超时,这两
种状况都会导致退出循环(超时退出循环的话因为就绪列表为空所以接下来的处理应该就不参与了直接返回到用户态)
3.如果有信号产生也会退出循环
4.解锁(让出CPU)
5.调用schedule_timeout睡眠
6.醒来啦,加锁,自旋锁,一个循环结束
4.当进程真正从循环中脱离后才会将当前进程从ep->wq指向的等待队列中删除
5.设置为进程状态为TASK_RUNNING(大意应该是非可唤醒状态)
6.就绪队列不为空的开始它的处理,拷贝资源给用户空间,拷贝资源是先把就绪事件链表转移到中间链表,然后挨个遍历拷贝到用户空间, 并且挨个判断其是否为水平触发,是的话再次插入到就绪链表。如果是超时进程到这里的该干嘛干嘛去返回用户态
参考链接:
转载:http://blog.csdn.net/unknow_cdc/article/details/76637743