揭开高性能服务器底层面纱

共 38190字,需浏览 77分钟

 ·

2021-08-25 13:55

揭开高性能服务器底层面纱

一、前言

我们经常听说高性能服务器,那什么是高性能服务器;用大白话来解释就是说处理事件快,效率高,占用服务器资源少,多路复用等等集万千宠爱于一身;但是,往往要想做到高性能,这是非常难的,需要一个好的优秀的架构和底层接口。

这篇文章只限于 linux 平台,对于 windows 平台下,可以去参考下 IOCP 的用法,这里就不多说了~

目前主流的高性能服务器底层都是封装了 EPOLL 接口,使用 epoll 进行事件处理,为什么 epoll 可以作为高性能服务器底层事件处理?那就让我们从源码下手,来揭开面纱~

二、源码解读

两个至关重要的结构体

eventpoll结构体:

/*
 * 此结构体存储在file->private_data中
 */
/*
    eventpoll结构体是epoll的核心里面存放着许多信息,主要包括
    1. struct rb_root rbr;这是一颗红黑树的根节点,代表着一颗红黑树,
    红黑树下面挂的是我们感兴趣的socket的事件,当我们调用epoll_ctl向
    epoll添加感兴趣的socket事件时,系统将我们的传递的信息封装成
    struct epitem结构体,然后挂到这颗红黑树的相应节点上
    2.struct list_head rdllist;这是一个双向链表,这个双向链表中存放
    的是就绪的事件当我们调用epoll_wait的时候这些事件会返回给用户
    3.struct file *file;文件结构指针,指向epoll文件
    */
struct eventpoll {
  // 自旋锁,在kernel内部用自旋锁加锁,就可以同时多线(进)程对此结构体进行操作
  // 主要是保护ready_list
  spinlock_t lock;
  // 这个互斥锁是为了保证在eventloop使用对应的文件描述符的时候,文件描述符不会被移除掉
  struct mutex mtx;
  // epoll_wait使用的等待队列,和进程唤醒有关
  wait_queue_head_t wq;
  // file->poll使用的等待队列,和进程唤醒有关
  wait_queue_head_t poll_wait;
  // 就绪的描述符队列,双向链表
  struct list_head rdllist;
  // 通过红黑树来组织当前epoll关注的文件描述符
  struct rb_root rbr;
  // 在向用户空间传输就绪事件的时候,将同时发生事件的文件描述符链入到这个链表里面
  struct epitem *ovflist;
  // 对应的user
  struct user_struct *user;
  // 对应的文件描述符
  struct file *file;
  // 下面两个是用于环路检测的优化
  int visited;
  struct list_head visited_list_link;
};

epitem结构体

// 对应于一个加入到epoll的文件  
struct epitem {  
    // 挂载到eventpoll 的红黑树节点  
    struct rb_node rbn;  
    // 挂载到eventpoll.rdllist 的节点  
    struct list_head rdllink;  
    // 连接到ovflist 的指针  
    struct epitem *next;  
    /* 文件描述符信息fd + file, 红黑树的key */  
    struct epoll_filefd ffd;  
    /* Number of active wait queue attached to poll operations */  
    int nwait;  
    // 当前文件的等待队列(eppoll_entry)列表  
    // 同一个文件上可能会监视多种事件,  
    // 这些事件可能属于不同的wait_queue中  
    // (取决于对应文件类型的实现),  
    // 所以需要使用链表  
    struct list_head pwqlist;  
    // 当前epitem 的所有者  
    struct eventpoll *ep;  
    /* List header used to link this item to the "struct file" items list */  
    struct list_head fllink;  
    /* epoll_ctl 传入的用户数据 */  
    struct epoll_event event;  
};

int epoll_create(int size);

作用:调用epoll_create方法创建一个epoll的句柄

源码:

SYSCALL_DEFINE1(epoll_create, int, size)
{
  if (size <= 0)
    return -EINVAL;

  return do_epoll_create(0);
}

从源码来看,其实 size 这个参数并没有什么作用,只要大于 0 就可以了~

我从其他地方获取资料说的是:以前底层实现是哈希表,现在是红黑树,为了兼容所以才保留了这个参数,也不知道真假,权当了解一下~

接着看下do_epoll_create

static int do_epoll_create(int flags)
{
  int error, fd;
  struct eventpoll *ep = NULL;
  struct file *file;

  /* Check the EPOLL_* constant for consistency.  */
  BUILD_BUG_ON(EPOLL_CLOEXEC != O_CLOEXEC);

  if (flags & ~EPOLL_CLOEXEC)
    return -EINVAL;
  /*
   * Create the internal data structure ("struct eventpoll").
   */
  error = ep_alloc(&ep);
  if (error < 0)
    return error;
  /*
   * Creates all the items needed to setup an eventpoll file. That is,
   * a file structure and a free file descriptor.
   */
    // 获取尚未被使用的文件描述符,即描述符数组的槽位
  fd = get_unused_fd_flags(O_RDWR | (flags & O_CLOEXEC));
  if (fd < 0) {
    error = fd;
    goto out_free_ep;
  }
    //创建一个名叫[eventpoll]的文件,并返回其文件结构指针,这个文件代表着epoll实例
  file = anon_inode_getfile("[eventpoll]", &eventpoll_fops, ep,
         O_RDWR | (flags & O_CLOEXEC));
  if (IS_ERR(file)) {
    error = PTR_ERR(file);
    goto out_free_fd;
  }
  ep->file = file;
    // 将file填入到对应的文件描述符数组的槽里面
  fd_install(fd, file);
  return fd;

out_free_fd:
  put_unused_fd(fd);
out_free_ep:
  ep_free(ep);
  return error;
}

这里error = ep_alloc(&ep);是分配eventpoll结构并进行的初始化操作;

综上所述,epoll 创建文件的过程,做了初始化和文件关联等;

int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);

作用:epoll的 事件注册函数

源码:

SYSCALL_DEFINE4(epoll_ctl, int, epfd, int, op, int, fd,
    struct epoll_event __user *, event)
{
  struct epoll_event epds;
  
     //错误处理:如果是删除,且epoll_event结构不为NULL则报错
  //如果是更改或者添加那就需要把从用户空间将epoll_event结构copy到内核空间
  if (ep_op_has_event(op) &&
        // 复制用户空间数据到内核
      copy_from_user(&epds, event, sizeof(struct epoll_event)))
    return -EFAULT;

  return do_epoll_ctl(epfd, op, fd, &epds, false);
}

我们看下函数do_epoll_ctl

int do_epoll_ctl(int epfd, int op, int fd, struct epoll_event *epds,
     bool nonblock)
{
  int error;
  int full_check = 0;
  struct fd f, tf;
  struct eventpoll *ep;
  struct epitem *epi;
  struct eventpoll *tep = NULL;

    //省略校验过程
  .....
    
  epi = ep_find(ep, tf.file, fd);

  error = -EINVAL;
  switch (op) {
            //增加
  case EPOLL_CTL_ADD:
    if (!epi) {
      epds->events |= EPOLLERR | EPOLLHUP;
      error = ep_insert(ep, epds, tf.file, fd, full_check);
    } else
      error = -EEXIST;
    break;
            //删除
  case EPOLL_CTL_DEL:
    if (epi)
      error = ep_remove(ep, epi);
    else
      error = -ENOENT;
    break;
            //修改
  case EPOLL_CTL_MOD:
    if (epi) {
      if (!(epi->event.events & EPOLLEXCLUSIVE)) {
        epds->events |= EPOLLERR | EPOLLHUP;
        error = ep_modify(ep, epi, epds);
      }
    } else
      error = -ENOENT;
    break;
  }
  if (tep != NULL)
    mutex_unlock(&tep->mtx);
  mutex_unlock(&ep->mtx);

error_tgt_fput:
  if (full_check) {
    clear_tfile_check_list();
    mutex_unlock(&epmutex);
  }

  fdput(tf);
error_fput:
  fdput(f);
error_return:

  return error;
}

do_epoll_ctl函数中,做的更多的是是对文件描述符的校验,然后根据传入的 fd 添加进去并且监视,这里就看一下增加的操作吧~

//往epollfd里面添加一个监听fd
static int ep_insert(struct eventpoll *ep, const struct epoll_event *event,
         struct file *tfile, int fd, int full_check)
{
  int error, pwake = 0;
  __poll_t revents;
  long user_watches;
  struct epitem *epi;
  struct ep_pqueue epq;

  lockdep_assert_irqs_enabled();

  user_watches = atomic_long_read(&ep->user->epoll_watches);
  if (unlikely(user_watches >= max_user_watches))
    return -ENOSPC;
    //分配和初始化 epi结构体
  if (!(epi = kmem_cache_alloc(epi_cache, GFP_KERNEL)))
    return -ENOMEM;

  /* Item initialization follow here ... */
  INIT_LIST_HEAD(&epi->rdllink);
  INIT_LIST_HEAD(&epi->fllink);
  INIT_LIST_HEAD(&epi->pwqlist);
    //将epoll对象挂载到该fd的epitem结构的ep成员中
  epi->ep = ep;
    //设置被监控的文件描述符及其对应的文件对象到epitem的ffd成员中
  ep_set_ffd(&epi->ffd, tfile, fd);
    //保存fd感兴趣的事件对象
  epi->event = *event;
  epi->nwait = 0;
  epi->next = EP_UNACTIVE_PTR;
  if (epi->event.events & EPOLLWAKEUP) {
    error = ep_create_wakeup_source(epi);
    if (error)
      goto error_create_wakeup_source;
  } else {
    RCU_INIT_POINTER(epi->ws, NULL);
  }

  /* Initialize the poll table using the queue callback */
  epq.epi = epi;
    //将ep_ptable_queue_proc注册到epq.pt中。
  init_poll_funcptr(&epq.pt, ep_ptable_queue_proc);

  /*
   * Attach the item to the poll hooks and get current event bits.
   * We can safely use the file* here because its usage count has
   * been increased by the caller of this function. Note that after
   * this operation completes, the poll callback can start hitting
   * the new item.
   */
    //  内部会调用ep_ptable_queue_proc, 在文件对应的wait queue head 上  
    // 注册回调函数, 并返回当前文件的状态 
  revents = ep_item_poll(epi, &epq.pt, 1);

  /*
   * We have to check if something went wrong during the poll wait queue
   * install process. Namely an allocation for a wait queue failed due
   * high memory pressure.
   */
  error = -ENOMEM;
  if (epi->nwait < 0)
    goto error_unregister;

  /* Add the current item to the list of active epoll hook for this file */
    //把epitem插入到f_ep_links链表的尾部
  spin_lock(&tfile->f_lock);
  list_add_tail_rcu(&epi->fllink, &tfile->f_ep_links);
  spin_unlock(&tfile->f_lock);

  /*
   * Add the current item to the RB tree. All RB tree operations are
   * protected by "mtx", and ep_insert() is called with "mtx" held.
   */
    // 将该epitem插入到ep的红黑树中
  ep_rbtree_insert(ep, epi);

  /* now check if we've created too many backpaths */
  error = -EINVAL;
  if (full_check && reverse_path_check())
    goto error_remove_epi;

  /* We have to drop the new item inside our item list to keep track of it */
  write_lock_irq(&ep->lock);

  /* record NAPI ID of new item if present */
  ep_set_busy_poll_napi_id(epi);

  /* If the file is already "ready" we drop it inside the ready list */
    //如果要监视的文件状态已经就绪并且还没有加入到就绪队列中,则将当前的epitem加入到就绪
  if (revents && !ep_is_linked(epi)) {
    list_add_tail(&epi->rdllink, &ep->rdllist);
    ep_pm_stay_awake(epi);

    /* Notify waiting tasks that events are available */
    if (waitqueue_active(&ep->wq))
            // 通知sys_epoll_wait , 调用回调函数唤醒sys_epoll_wait 进程  
      wake_up(&ep->wq);
    if (waitqueue_active(&ep->poll_wait))
      pwake++;
  }

  write_unlock_irq(&ep->lock);

  atomic_long_inc(&ep->user->epoll_watches);

  /* We have to call this outside the lock */
  if (pwake)
    ep_poll_safewake(ep, NULL);

  return 0;

error_remove_epi:
  spin_lock(&tfile->f_lock);
  list_del_rcu(&epi->fllink);
  spin_unlock(&tfile->f_lock);

  rb_erase_cached(&epi->rbn, &ep->rbr);

error_unregister:
  ep_unregister_pollwait(ep, epi);

  /*
   * We need to do this because an event could have been arrived on some
   * allocated wait queue. Note that we don'
t care about the ep->ovflist
   * list, since that is used/cleaned only inside a section bound by "mtx".
   * And ep_insert() is called with "mtx" held.
   */
  write_lock_irq(&ep->lock);
  if (ep_is_linked(epi))
    list_del_init(&epi->rdllink);
  write_unlock_irq(&ep->lock);

  wakeup_source_unregister(ep_wakeup_source(epi));

error_create_wakeup_source:
  kmem_cache_free(epi_cache, epi);

  return error;
}

这里做的更多的是对事件的一个绑定和挂载操作,如果这个 socket 有事件就绪,则会调用ep_poll_callback函数,这个函数负责将事件加入就绪队列并唤醒epoll_wait

int epoll_wait(int epfd, struct epoll_event *events,int maxevents, int timeout);

作用:等待在 epoll 监控的事件中已经发生的事件。

源码;

SYSCALL_DEFINE4(epoll_wait, int, epfd, struct epoll_event __user *, events,
    int, maxevents, int, timeout)
{
  return do_epoll_wait(epfd, events, maxevents, timeout);
}

直接去看下do_epoll_wait函数吧~

static int do_epoll_wait(int epfd, struct epoll_event __user *events,
       int maxevents, int timeout)
{
  int error;
  struct fd f;
  struct eventpoll *ep;

  /* The maximum number of event must be greater than zero */
  if (maxevents <= 0 || maxevents > EP_MAX_EVENTS)
    return -EINVAL;

  /* Verify that the area passed by the user is writeable */
  if (!access_ok(events, maxevents * sizeof(struct epoll_event)))
    return -EFAULT;

  /* Get the "struct file *" for the eventpoll file */
    //获取epoll的struct file
  //再通过对应的struct file获得eventpoll 
  f = fdget(epfd);
  if (!f.file)
    return -EBADF;

  /*
   * We have to check that the file structure underneath the fd
   * the user passed to us _is_ an eventpoll file.
   */
  error = -EINVAL;
  if (!is_file_epoll(f.file))
    goto error_fput;

  /*
   * At this point it is safe to assume that the "private_data" contains
   * our own data structure.
   */
    // 根据private_data得到eventpoll结构
  ep = f.file->private_data;

  /* Time to fish for events ... */
    //等待事件的到来
  error = ep_poll(ep, events, maxevents, timeout);

error_fput:
  fdput(f);
  return error;
}

看来核心在 ep_poll 函数呀~去看看吧

static int ep_poll(struct eventpoll *ep, struct epoll_event __user *events,
       int maxevents, long timeout)
{
  int res = 0, eavail, timed_out = 0;
  u64 slack = 0;
  wait_queue_entry_t wait;
  ktime_t expires, *to = NULL;

  lockdep_assert_irqs_enabled();
  //如果就绪链表为空则阻塞直到timeout
  if (timeout > 0) {
    struct timespec64 end_time = ep_set_mstimeout(timeout);

    slack = select_estimate_accuracy(&end_time);
    to = &expires;
    *to = timespec64_to_ktime(end_time);
  //非阻塞
  } else if (timeout == 0) {
    /*
     * Avoid the unnecessary trip to the wait queue loop, if the
     * caller specified a non blocking operation. We still need
     * lock because we could race and not see an epi being added
     * to the ready list while in irq callback. Thus incorrectly
     * returning 0 back to userspace.
     */
    timed_out = 1;

    write_lock_irq(&ep->lock);
    eavail = ep_events_available(ep);
    write_unlock_irq(&ep->lock);

    goto send_events;
  }

fetch_events:
  //是否有就绪事件,或正在扫描处理eventpoll中的rdllist链表
  if (!ep_events_available(ep))
    ep_busy_loop(ep, timed_out);

  eavail = ep_events_available(ep);
  if (eavail)
    goto send_events;

  /*
   * Busy poll timed out.  Drop NAPI ID for now, we can add
   * it back in when we have moved a socket with a valid NAPI
   * ID onto the ready list.
   */
  ep_reset_busy_poll_napi_id(ep);

  do {
    /*
     * Internally init_wait() uses autoremove_wake_function(),
     * thus wait entry is removed from the wait queue on each
     * wakeup. Why it is important? In case of several waiters
     * each new wakeup will hit the next waiter, giving it the
     * chance to harvest new event. Otherwise wakeup can be
     * lost. This is also good performance-wise, because on
     * normal wakeup path no need to call __remove_wait_queue()
     * explicitly, thus ep->lock is not taken, which halts the
     * event delivery.
     */
    init_wait(&wait);

    write_lock_irq(&ep->lock);
    /*
     * Barrierless variant, waitqueue_active() is called under
     * the same lock on wakeup ep_poll_callback() side, so it
     * is safe to avoid an explicit barrier.
     */
        //执行ep_poll_callback()唤醒时应当需要将当前进程唤醒,
    //这就是我们将任务状态设置为TASK_INTERRUPTIBLE的原因。
    __set_current_state(TASK_INTERRUPTIBLE);

    /*
     * Do the final check under the lock. ep_scan_ready_list()
     * plays with two lists (->rdllist and ->ovflist) and there
     * is always a race when both lists are empty for short
     * period of time although events are pending, so lock is
     * important.
     */
    eavail = ep_events_available(ep);
    if (!eavail) {
      if (signal_pending(current))
        res = -EINTR;
      else
        __add_wait_queue_exclusive(&ep->wq, &wait);
    }
    write_unlock_irq(&ep->lock);

    if (eavail || res)
      break;

    if (!schedule_hrtimeout_range(to, slack, HRTIMER_MODE_ABS)) {
      timed_out = 1;
      break;
    }

    /* We were woken up, thus go and try to harvest some events */
    eavail = 1;

  } while (0);
  //醒来
  __set_current_state(TASK_RUNNING);

  if (!list_empty_careful(&wait.entry)) {
    write_lock_irq(&ep->lock);
    __remove_wait_queue(&ep->wq, &wait);
    write_unlock_irq(&ep->lock);
  }

send_events:
  if (fatal_signal_pending(current)) {
    /*
     * Always short-circuit for fatal signals to allow
     * threads to make a timely exit without the chance of
     * finding more events available and fetching
     * repeatedly.
     */
    res = -EINTR;
  }
  /*
   * Try to transfer events to user space. In case we get 0 events and
   * there's still timeout left over, we go trying again in search of
   * more luck.
   */
    /* 如果一切正常, 有event发生, 就开始准备数据copy给用户空间了... */
  if (!res && eavail &&
      !(res = ep_send_events(ep, events, maxevents)) && !timed_out)
    goto fetch_events;

  return res;
}

ep_send_events()函数将用户传入的内存简单封装到ep_send_events_data结构中,然后调用ep_scan_ready_list()将就绪队列中的事件传入用户空间的内存。用户空间访问这个结果,进行处理。

static int ep_send_events(struct eventpoll *ep,
        struct epoll_event __user *events, int maxevents)
{
  struct ep_send_events_data esed;

  esed.maxevents = maxevents;
  esed.events = events;

  ep_scan_ready_list(ep, ep_send_events_proc, &esed, 0, false);
  return esed.res;
}
static __poll_t ep_send_events_proc(struct eventpoll *ep, struct list_head *head,
             void *priv)
{
  struct ep_send_events_data *esed = priv;
  __poll_t revents;
  struct epitem *epi, *tmp;
  struct epoll_event __user *uevent = esed->events;
  struct wakeup_source *ws;
  poll_table pt;

  init_poll_funcptr(&pt, NULL);
  esed->res = 0;

  /*
   * We can loop without lock because we are passed a task private list.
   * Items cannot vanish during the loop because ep_scan_ready_list() is
   * holding "mtx" during this call.
   */
  lockdep_assert_held(&ep->mtx);

  list_for_each_entry_safe(epi, tmp, head, rdllink) {
    if (esed->res >= esed->maxevents)
      break;

    /*
     * Activate ep->ws before deactivating epi->ws to prevent
     * triggering auto-suspend here (in case we reactive epi->ws
     * below).
     *
     * This could be rearranged to delay the deactivation of epi->ws
     * instead, but then epi->ws would temporarily be out of sync
     * with ep_is_linked().
     */
    ws = ep_wakeup_source(epi);
    if (ws) {
      if (ws->active)
        __pm_stay_awake(ep->ws);
      __pm_relax(ws);
    }

    list_del_init(&epi->rdllink);

    /*
     * If the event mask intersect the caller-requested one,
     * deliver the event to userspace. Again, ep_scan_ready_list()
     * is holding ep->mtx, so no operations coming from userspace
     * can change the item.
     */
    revents = ep_item_poll(epi, &pt, 1);
    if (!revents)
      continue;
    //把当前事件和用户传入的数据copy到用户空间
    if (__put_user(revents, &uevent->events) ||
        __put_user(epi->event.data, &uevent->data)) {
            //复制失败把epi重新插入到ready链表
      list_add(&epi->rdllink, head);
      ep_pm_stay_awake(epi);
      if (!esed->res)
        esed->res = -EFAULT;
      return 0;
    }
    esed->res++;
    uevent++;
    if (epi->event.events & EPOLLONESHOT)
      epi->event.events &= EP_PRIVATE_BITS;
    else if (!(epi->event.events & EPOLLET)) {
      /*
       * If this file has been added with Level
       * Trigger mode, we need to insert back inside
       * the ready list, so that the next call to
       * epoll_wait() will check again the events
       * availability. At this point, no one can insert
       * into ep->rdllist besides us. The epoll_ctl()
       * callers are locked out by
       * ep_scan_ready_list() holding "mtx" and the
       * poll callback will queue them in ep->ovflist.
       */
      list_add_tail(&epi->rdllink, &ep->rdllist);
      ep_pm_stay_awake(epi);
    }
  }

  return 0;
}

看到__put_user就知道,从内核拷贝数据到用户空间使用了__put_user函数,和所谓的共享内存没有一点关系,现在博客上面有很多错误,望大家修正~

三、总结

请大家原谅我水平和时间有限,这次阅读 epoll 的源码起源于在网上看到内核与用户态数据拷贝使用的方法存在争议,所以找来 epoll 的源码进行了粗略的阅读,后续还会抽时间精读一下,不过这次虽然是粗略的阅读了 epoll 的源码,但是收获也很多,接下来就简单做下总结~(这个总结有我自己看源码得来的,也有从网络上搜集的资料,如果错误,请大家不吝赐教)

epoll_create

  • epoll_create传入参数的时候,只要保证参数大于 0 就可以,这个参数时无用的
  • 初始化等待队列和初始化就绪链表,还有初始化红黑树的头结点
  • 分配eventpoll结构并进行的初始化操作;

epoll_ctl

  • epoll_event结构拷贝到内核空间中,并且判断加入的 fd 是否支持 poll 结( epoll,poll,selectI/O 多路复用必须支持 poll 操作).
  • ep = f.file->private_data;获取event_poll对象;
  • 通过 op 判断事件的修改、添加、删除操作
  • 首先在 eventpoll 结构中的红黑树查找是否已经存在了相对应的 fd ,没找到就支持插入操作,否则报重复的错误,还有修改,删除操作。
  • 插入操作时,会创建一个与 fd 对应的 epitem 结构,并且初始化相关成员,并指定调用 poll_wait 时的回调函数用于数据就绪时唤醒进程,(其内部,初始化设备的等待队列,将该进程注册到等待队列)完成这一步,
  • epitem 就跟这个 socket 关联起来了, 当它有状态变化时,会通过ep_poll_callback()来通知.
  • 最后调用加入的 fd 的fileoperation->poll函数(最后会调用 poll_wait 操作)用于完注册操作,将 epitem 结构添加到红黑树中。

epoll_wait

  • 判断 eventpoll 对象的链表是否为空,是否需要操作;初始化一个等待队列,把自己挂上去,设置自己的进程状态
  • 若是可睡眠状态.判断是否有信号到来(有的话直接被中断醒来,),如果没有那就调用 schedule_timeout 进行睡眠,
  • 如果超时或者被唤醒,首先从自己初始化的等待队列删除,然后开始拷贝资源给用户空间了
  • 拷贝资源则是先把就绪事件链表转移到中间链表,然后挨个遍历拷贝到用户空间,并且挨个判断其是否为水平触发,是的话再次插入到就绪链表

用户态和内核态拷贝数据方式

  • 用户态拷贝数据到内核态,是调用了函数:copy_from_user
  • 内核态数据拷贝到用户态,调用了函数:__put_user

这里注意,好多博客上面的说拷贝数据使用的是共享内存,是错误的,千万别信哈~~~~

ET和LT模式不同的原理

else if (!(epi->event.events & EPOLLET)) {
 
        /*
        * If this file has been added with Level
        * Trigger mode, we need to insert back inside
        * the ready list, so that the next call to
        * epoll_wait() will check again the events
        * availability. At this point, no one can insert
        * into ep->rdllist besides us. The epoll_ctl()
        * callers are locked out by
        * ep_scan_ready_list() holding "mtx" and the
        * poll callback will queue them in ep->ovflist.
        */
        list_add_tail(&epi->rdllink, &ep->rdllist);
        ep_pm_stay_awake(epi);
      }

这里会判断事件类型是否包含了 EPOLLET 位,如果不包含的话就会将该事件对应的 epitem 对象重新加入到 epoll 的 rdllist 链表中,用户态程序下次调用epoll_wait()返回时就又能获取该 epitem 了;等到下一次epoll_wait时, 会立即返回, 并通知给用户空间;

epoll 为什么高效(相比select) 来源:https://www.cnblogs.com/apprentice89/p/3234677.html

  • 仅从上面的调用方式就可以看出 epoll 比 select/poll 的一个优势: select/poll 每次调用都要传递所要监控的所有 fd 给 select/poll 系统调用(这意味着每次调用都要将 fd 列表从用户态拷贝到内核态,当 fd 数目很多时,这会造成低效)。而每次调用 epoll_wait 时(作用相当于调用 select/poll),不需要再传递 fd 列表给内核,因为已经在 epoll_ctl 中将需要监控的 fd 告诉了内核( epoll_ctl 不需要每次都拷贝所有的 fd,只需要进行增量式操作)。所以,在调用 epoll_create 之后,内核已经在内核态开始准备数据结构存放要监控的 fd 了。每次 epoll_ctl 只是对这个数据结构进行简单的维护。
  • 此外,内核使用了slab机制,为epoll提供了快速的数据结构:

在内核里,一切皆文件。所以,epoll 向内核注册了一个文件系统,用于存储上述的被监控的 fd。当你调用 epoll_create 时,就会在这个虚拟的 epoll 文件系统里创建一个 file 结点。当然这个 file 不是普通文件,它只服务于 epoll。epoll 在被内核初始化时(操作系统启动),同时会开辟出 epoll 自己的内核高速 cache 区,用于安置每一个我们想监控的 fd,这些 fd 会以红黑树的形式保存在内核 cache 里,以支持快速的查找、插入、删除。这个内核高速 cache 区,就是建立连续的物理内存页,然后在之上建立 slab 层,简单的说,就是物理上分配好你想要的 size 的内存对象,每次使用时都是使用空闲的已分配好的对象。

  • epoll 的第三个优势在于:当我们调用 epoll_ctl 往里塞入百万个 fd 时,epoll_wait 仍然可以飞快的返回,并有效的将发生事件的 fd 给我们用户。这是由于我们在调用 epoll_create 时,内核除了帮我们在 epoll 文件系统里建了个 file 结点,在内核 cache 里建了个红黑树用于存储以后 epoll_ctl 传来的 fd 外,还会再建立一个 list 链表,用于存储准备就绪的事件,当 epoll_wait 调用时,仅仅观察这个 list 链表里有没有数据即可。有数据就返回,没有数据就 sleep,等到 timeout 时间到后即使链表没数据也返回。所以,epoll_wait 非常高效。而且,通常情况下即使我们要监控百万计的 fd,大多一次也只返回很少量的准备就绪 fd 而已,所以,epoll_wait 仅需要从内核态 copy 少量的 fd 到用户态而已。那么,这个准备就绪 list 链表是怎么维护的呢?当我们执行 epoll_ctl 时,除了把 fd 放到 epoll 文件系统里 file 对象对应的红黑树上之外,还会给内核中断处理程序注册一个回调函数,告诉内核,如果这个 fd 的中断到了,就把它放到准备就绪 list 链表里。所以,当一个 fd (例如 socket)上有数据到了,内核在把设备(例如网卡)上的数据 copy 到内核中后就来把 fd(socket)插入到准备就绪 list 链表里了。
浏览 63
点赞
评论
收藏
分享

手机扫一扫分享

分享
举报
评论
图片
表情
推荐
点赞
评论
收藏
分享

手机扫一扫分享

分享
举报