Chinaunix首页 | 论坛 | 博客
  • 博客访问: 1851192
  • 博文数量: 184
  • 博客积分: 0
  • 博客等级: 民兵
  • 技术积分: 2388
  • 用 户 组: 普通用户
  • 注册时间: 2016-12-21 22:26
个人简介

90后空巢老码农

文章分类

全部博文(184)

文章存档

2021年(26)

2020年(56)

2019年(54)

2018年(47)

2017年(1)

我的朋友

分类: NOSQL

2019-06-23 17:26:02

evport是redis当中I/O多路复用的首选,,代码如下

点击(此处)折叠或打开

  1. #ifdef HAVE_EVPORT
  2. #include "ae_evport.c"
  3. #else
  4.     #ifdef HAVE_EPOLL
  5.     #include "ae_epoll.c"
  6.     #else
  7.         #ifdef HAVE_KQUEUE
  8.         #include "ae_kqueue.c"
  9.         #else
  10.         #include "ae_select.c"
  11.         #endif
  12.     #endif
  13. #endif
具体的evport原理还是得等到彻底结束redis这个部分之后再讲,今天只谈封装部分,redis当中封装evport的结构如下

点击(此处)折叠或打开

  1. typedef struct aeApiState {
  2.     int portfd; /* event port */
  3.     int npending; /* # of pending fds */
  4.     int pending_fds[MAX_EVENT_BATCHSZ]; /* pending fds */
  5.     int pending_masks[MAX_EVENT_BATCHSZ]; /* pending fds' masks */
  6. } aeApiState;
其中对于这个结构的常规调用如下:

点击(此处)折叠或打开

  1. static int aeApiCreate(aeEventLoop *eventLoop) {
  2.     int i;
  3.     aeApiState *state = zmalloc(sizeof(aeApiState));
  4.     if (!state) return -1;

  5.     state->portfd = port_create();
  6.     if (state->portfd == -1) {
  7.         zfree(state);
  8.         return -1;
  9.     }

  10.     state->npending = 0;

  11.     for (i = 0; i < MAX_EVENT_BATCHSZ; i++) {
  12.         state->pending_fds[i] = -1;
  13.         state->pending_masks[i] = AE_NONE;
  14.     }

  15.     eventLoop->apidata = state;
  16.     return 0;
  17. }

  18. static int aeApiResize(aeEventLoop *eventLoop, int setsize) {
  19.     /* Nothing to resize here. */
  20.     return 0;
  21. }

  22. static void aeApiFree(aeEventLoop *eventLoop) {
  23.     aeApiState *state = eventLoop->apidata;

  24.     close(state->portfd);
  25.     zfree(state);
  26. }

  27. static int aeApiLookupPending(aeApiState *state, int fd) {
  28.     int i;

  29.     for (i = 0; i < state->npending; i++) {
  30.         if (state->pending_fds[i] == fd)
  31.             return (i);
  32.     }

  33.     return (-1);
  34. }

  35. /*
  36.  * Helper function to invoke port_associate for the given fd and mask.
  37.  */
  38. static int aeApiAssociate(const char *where, int portfd, int fd, int mask) {
  39.     int events = 0;
  40.     int rv, err;

  41.     if (mask & AE_READABLE)
  42.         events |= POLLIN;
  43.     if (mask & AE_WRITABLE)
  44.         events |= POLLOUT;

  45.     if (evport_debug)
  46.         fprintf(stderr, "%s: port_associate(%d, 0x%x) = ", where, fd, events);

  47.     rv = port_associate(portfd, PORT_SOURCE_FD, fd, events,
  48.         (void *)(uintptr_t)mask);
  49.     err = errno;

  50.     if (evport_debug)
  51.         fprintf(stderr, "%d (%s)\n", rv, rv == 0 ? "no error" : strerror(err));

  52.     if (rv == -1) {
  53.         fprintf(stderr, "%s: port_associate: %s\n", where, strerror(err));

  54.         if (err == EAGAIN)
  55.             fprintf(stderr, "aeApiAssociate: event port limit exceeded.");
  56.     }

  57.     return rv;
  58. }

  59. static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask) {
  60.     aeApiState *state = eventLoop->apidata;
  61.     int fullmask, pfd;

  62.     if (evport_debug)
  63.         fprintf(stderr, "aeApiAddEvent: fd %d mask 0x%x\n", fd, mask);

  64.     /*
  65.      * Since port_associate's "events" argument replaces any existing events, we
  66.      * must be sure to include whatever events are already associated when
  67.      * we call port_associate() again.
  68.      */
  69.     fullmask = mask | eventLoop->events[fd].mask;
  70.     pfd = aeApiLookupPending(state, fd);

  71.     if (pfd != -1) {
  72.         /*
  73.          * This fd was recently returned from aeApiPoll. It should be safe to
  74.          * assume that the consumer has processed that poll event, but we play
  75.          * it safer by simply updating pending_mask. The fd will be
  76.          * re-associated as usual when aeApiPoll is called again.
  77.          */
  78.         if (evport_debug)
  79.             fprintf(stderr, "aeApiAddEvent: adding to pending fd %d\n", fd);
  80.         state->pending_masks[pfd] |= fullmask;
  81.         return 0;
  82.     }

  83.     return (aeApiAssociate("aeApiAddEvent", state->portfd, fd, fullmask));
  84. }

  85. static void aeApiDelEvent(aeEventLoop *eventLoop, int fd, int mask) {
  86.     aeApiState *state = eventLoop->apidata;
  87.     int fullmask, pfd;

  88.     if (evport_debug)
  89.         fprintf(stderr, "del fd %d mask 0x%x\n", fd, mask);

  90.     pfd = aeApiLookupPending(state, fd);

  91.     if (pfd != -1) {
  92.         if (evport_debug)
  93.             fprintf(stderr, "deleting event from pending fd %d\n", fd);

  94.         /*
  95.          * This fd was just returned from aeApiPoll, so it's not currently
  96.          * associated with the port. All we need to do is update
  97.          * pending_mask appropriately.
  98.          */
  99.         state->pending_masks[pfd] &= ~mask;

  100.         if (state->pending_masks[pfd] == AE_NONE)
  101.             state->pending_fds[pfd] = -1;

  102.         return;
  103.     }

  104.     /*
  105.      * The fd is currently associated with the port. Like with the add case
  106.      * above, we must look at the full mask for the file descriptor before
  107.      * updating that association. We don't have a good way of knowing what the
  108.      * events are without looking into the eventLoop state directly. We rely on
  109.      * the fact that our caller has already updated the mask in the eventLoop.
  110.      */

  111.     fullmask = eventLoop->events[fd].mask;
  112.     if (fullmask == AE_NONE) {
  113.         /*
  114.          * We're removing *all* events, so use port_dissociate to remove the
  115.          * association completely. Failure here indicates a bug.
  116.          */
  117.         if (evport_debug)
  118.             fprintf(stderr, "aeApiDelEvent: port_dissociate(%d)\n", fd);

  119.         if (port_dissociate(state->portfd, PORT_SOURCE_FD, fd) != 0) {
  120.             perror("aeApiDelEvent: port_dissociate");
  121.             abort(); /* will not return */
  122.         }
  123.     } else if (aeApiAssociate("aeApiDelEvent", state->portfd, fd,
  124.         fullmask) != 0) {
  125.         /*
  126.          * ENOMEM is a potentially transient condition, but the kernel won't
  127.          * generally return it unless things are really bad. EAGAIN indicates
  128.          * we've reached an resource limit, for which it doesn't make sense to
  129.          * retry (counter-intuitively). All other errors indicate a bug. In any
  130.          * of these cases, the best we can do is to abort.
  131.          */
  132.         abort(); /* will not return */
  133.     }
  134. }
  135. static char *aeApiName(void) {
  136.     return "evport";
  137. }

关键调用api如下:

点击(此处)折叠或打开

  1. static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) {
  2.     aeApiState *state = eventLoop->apidata;
  3.     struct timespec timeout, *tsp;
  4.     int mask, i;
  5.     uint_t nevents;
  6.     port_event_t event[MAX_EVENT_BATCHSZ];

  7.     /*
  8.      * If we've returned fd events before, we must re-associate them with the
  9.      * port now, before calling port_get(). See the block comment at the top of
  10.      * this file for an explanation of why.
  11.      */
  12.     for (i = 0; i < state->npending; i++) {
  13.         if (state->pending_fds[i] == -1)
  14.             /* This fd has since been deleted. */
  15.             continue;

  16.         if (aeApiAssociate("aeApiPoll", state->portfd,
  17.             state->pending_fds[i], state->pending_masks[i]) != 0) {
  18.             /* See aeApiDelEvent for why this case is fatal. */
  19.             abort();
  20.         }

  21.         state->pending_masks[i] = AE_NONE;
  22.         state->pending_fds[i] = -1;
  23.     }

  24.     state->npending = 0;

  25.     if (tvp != NULL) {
  26.         timeout.tv_sec = tvp->tv_sec;
  27.         timeout.tv_nsec = tvp->tv_usec * 1000;
  28.         tsp = &timeout;
  29.     } else {
  30.         tsp = NULL;
  31.     }

  32.     /*
  33.      * port_getn can return with errno == ETIME having returned some events (!).
  34.      * So if we get ETIME, we check nevents, too.
  35.      */
  36.     nevents = 1;
  37.     if (port_getn(state->portfd, event, MAX_EVENT_BATCHSZ, &nevents,
  38.         tsp) == -1 && (errno != ETIME || nevents == 0)) {
  39.         if (errno == ETIME || errno == EINTR)
  40.             return 0;

  41.         /* Any other error indicates a bug. */
  42.         perror("aeApiPoll: port_get");
  43.         abort();
  44.     }

  45.     state->npending = nevents;

  46.     for (i = 0; i < nevents; i++) {
  47.             mask = 0;
  48.             if (event[i].portev_events & POLLIN)
  49.                 mask |= AE_READABLE;
  50.             if (event[i].portev_events & POLLOUT)
  51.                 mask |= AE_WRITABLE;

  52.             eventLoop->fired[i].fd = event[i].portev_object;
  53.             eventLoop->fired[i].mask = mask;

  54.             if (evport_debug)
  55.                 fprintf(stderr, "aeApiPoll: fd %d mask 0x%x\n",
  56.                     (int)event[i].portev_object, mask);

  57.             state->pending_fds[i] = event[i].portev_object;
  58.             state->pending_masks[i] = (uintptr_t)event[i].portev_user;
  59.     }

  60.     return nevents;
  61. }
redis当中的I/O多路复用就暂时讲到这里了,还请各位看官老爷多提宝贵意见~~~
阅读(12318) | 评论(0) | 转发(0) |
给主人留下些什么吧!~~