Chinaunix首页 | 论坛 | 博客
  • 博客访问: 1861644
  • 博文数量: 152
  • 博客积分: 3730
  • 博客等级: 上尉
  • 技术积分: 3710
  • 用 户 组: 普通用户
  • 注册时间: 2011-06-02 14:36
个人简介

减肥,运动,学习,进步...

文章分类

全部博文(152)

文章存档

2016年(14)

2015年(17)

2014年(16)

2013年(4)

2012年(66)

2011年(35)

分类: LINUX

2015-02-07 23:05:44

网络编程中收发包处理是非常重要的过程,网络通信的本质就是报文的收发,在报文收发的基础上进行相关逻辑的处理和扩展。因此在libevent中也有报文收发处理相关的实现,在libevent中报文的发送和接收支持多缓冲区以及零拷贝相关的技术,本文就libevent的收发包处理进行分析。

在网络编程中需要对数据进行一定的管理和设计,在libevent中提供了一系列的数据结构:
struct evbuffer_chain:该结构体主要用于抽象一块内存空间。采用链表的方式进行管理,其中有该内存大小的相关字段。
struct evbuffer:该结构体主要用于抽象内存空间的管理器,其中指出了内存块的多少等重要信息,另外包含了相关信息,用于标识该管理归属的socket。在网络编程中收发包处理都是基于socket,
struct bufferevent:该结构体中主要是对某一个socket的收发包队列的抽象,其中包含了接收队列和发送队列、事件,每个socket可以包含一个读事件和一个写事件以及对应的回调函数。

点击(此处)折叠或打开

  1. /* 缓存块*/
  2. /** A single item in an evbuffer. */
  3. struct evbuffer_chain {
  4.     /** points to next buffer in the chain */
  5.         /* 链表的实现 */
  6.     struct evbuffer_chain *next;

  7.     /** total allocation available in the buffer field. */
  8.     size_t buffer_len;

  9.     /** unused space at the beginning of buffer or an offset into a
  10.      * file for sendfile buffers. */
  11.     ev_off_t misalign;

  12.     /** Offset into buffer + misalign at which to start writing.
  13.      * In other words, the total number of bytes actually stored
  14.      * in buffer. */
  15.     size_t off;

  16.     /** Set if special handling is required for this chain */
  17.     unsigned flags;
  18.         ...
  19.          /* 实际的数据区 */
  20.     unsigned char *buffer;
  21. };

  22. struct evbuffer {
  23.     /** The first chain in this buffer's linked list of chains. */
  24.     /* 指向第一个chain */
  25.     struct evbuffer_chain *first;
  26.     /** The last chain in this buffer's linked list of chains. */
  27.     struct evbuffer_chain *last;

  28.     /** Pointer to the next pointer pointing at the 'last_with_data' chain.
  29.      *
  30.      * To unpack:
  31.      *
  32.      * The last_with_data chain is the last chain that has any data in it.
  33.      * If all chains in the buffer are empty, it is the first chain.
  34.      * If the buffer has no chains, it is NULL.
  35.      *
  36.      * The last_with_datap pointer points at _whatever 'next' pointer_
  37.      * points at the last_with_datap chain. If the last_with_data chain
  38.      * is the first chain, or it is NULL, then the last_with_datap pointer
  39.      * is &buf->first.
  40.      */
  41.      /* 指向最后一个 */
  42.     struct evbuffer_chain **last_with_datap;

  43.     /** Total amount of bytes stored in all chains.*/
  44.     size_t total_len;

  45.     /* 最后一次调用只有添加的字节和删除的字节 */
  46.     /** Number of bytes we have added to the buffer since we last tried to
  47.      * invoke callbacks. */
  48.     size_t n_add_for_cb;
  49.     /** Number of bytes we have removed from the buffer since we last
  50.      * tried to invoke callbacks. */
  51.     size_t n_del_for_cb;
  52.     ...
  53.     /* 延迟队列 */
  54.     /** Used to implement deferred callbacks. */
  55.     struct deferred_cb_queue *cb_queue;

  56.     /** A reference count on this evbuffer.     When the reference count
  57.      * reaches 0, the buffer is destroyed.    Manipulated with
  58.      * evbuffer_incref and evbuffer_decref_and_unlock and
  59.      * evbuffer_free. */
  60.     int refcnt;

  61.     /** A deferred_cb handle to make all of this buffer's callbacks
  62.      * invoked from the event loop. */
  63.      /* 延迟回调块 */
  64.     struct deferred_cb deferred;

  65.     /* 延迟队列回调函数链表 */
  66.     /** A doubly-linked-list of callback functions */
  67.     TAILQ_HEAD(evbuffer_cb_queue, evbuffer_cb_entry) callbacks;

  68.     /** The parent bufferevent object this evbuffer belongs to.
  69.      * NULL if the evbuffer stands alone. */
  70.      /* 归属的bufferevent */
  71.     struct bufferevent *parent;
  72. };

  73. /* buffer事件 */
  74. struct bufferevent {
  75.     /** Event base for which this bufferevent was created. */
  76.     struct event_base *ev_base;
  77.     /** Pointer to a table of function pointers to set up how this
  78.      bufferevent behaves. */
  79.      /* 设置bufferevent的行为的操作函数 */
  80.     const struct bufferevent_ops *be_ops;

  81.     /** A read event that triggers when a timeout has happened or a socket
  82.      is ready to read data. Only used by some subtypes of
  83.      bufferevent. */
  84.      /* 读事件 */
  85.     struct event ev_read;
  86.     /** A write event that triggers when a timeout has happened or a socket
  87.      is ready to write data. Only used by some subtypes of
  88.      bufferevent. */
  89.      /* 写事件 */
  90.     struct event ev_write;

  91.     /** An input buffer. Only the bufferevent is allowed to add data to
  92.      this buffer, though the user is allowed to drain it. */
  93.      /* 对应的接收缓存 */
  94.     struct evbuffer *input;

  95.     /** An input buffer. Only the bufferevent is allowed to drain data
  96.      from this buffer, though the user is allowed to add it. */
  97.      /* 发送缓存 */
  98.     struct evbuffer *output;

  99.     /* 用于读写限速控制的结构体 */
  100.     struct event_watermark wm_read;
  101.     struct event_watermark wm_write;

  102.     /* 读回调函数 */
  103.     bufferevent_data_cb readcb;
  104.     /* 写回调函数 */
  105.     bufferevent_data_cb writecb;
  106.     /* This should be called 'eventcb', but renaming it would break
  107.     void *cbarg;
  108.     ...
  109. }
代码实现过程:
接下来主要是fd与bufferevent的绑定过程:

点击(此处)折叠或打开

  1. /* 针对fd的bufferevent */
  2. struct bufferevent *
  3. bufferevent_socket_new(struct event_base *base, evutil_socket_t fd,
  4.     int options)
  5. {
  6.     struct bufferevent_private *bufev_p;
  7.     struct bufferevent *bufev;

  8.     if ((bufev_p = mm_calloc(1, sizeof(struct bufferevent_private)))== NULL)
  9.         return NULL;

  10.     if (bufferevent_init_common(bufev_p, base, &bufferevent_ops_socket,
  11.                  options) < 0) {
  12.         mm_free(bufev_p);
  13.         return NULL;
  14.     }
  15.     bufev = &bufev_p->bev;
  16.     evbuffer_set_flags(bufev->output, EVBUFFER_FLAG_DRAINS_TO_FD);

  17.     /* 添加基于fd的读事件,对应的回调函数为bufferevent_readcb,参数为bufferent */
  18.     event_assign(&bufev->ev_read, bufev->ev_base, fd,
  19.      EV_READ|EV_PERSIST, bufferevent_readcb, bufev);
  20.     /* 添加基于fd的写事件,对应的回调函数为bufferevent_writecb,参数为bufferent */
  21.     event_assign(&bufev->ev_write, bufev->ev_base, fd,
  22.      EV_WRITE|EV_PERSIST, bufferevent_writecb, bufev);

  23.     /* 添加evbuffer的回调函数为bufferevent_socket_outbuf_cb,主要针对发送数据的 */
  24.     evbuffer_add_cb(bufev->output, bufferevent_socket_outbuf_cb, bufev);

  25.     /* 允许接收数据 */
  26.     evbuffer_freeze(bufev->input, 0);
  27.     /* 冻结发送数据的缓存区 */
  28.     evbuffer_freeze(bufev->output, 1);

  29.     return bufev;
  30. }
通用bufferevent初始化函数,主要完成了缓存管理器的创建,也就是接收缓存管理器和发送缓存管理器两个缓存的创建和bufferevent的绑定关系,该初始化是通用接口,不止是针对socket有效。

点击(此处)折叠或打开

  1. /* bufferevent的初始化接口,不同的bufferevent通用的接口 */
  2. int
  3. bufferevent_init_common(struct bufferevent_private *bufev_private,
  4.     struct event_base *base,
  5.     const struct bufferevent_ops *ops,
  6.     enum bufferevent_options options)
  7. {
  8.     struct bufferevent *bufev = &bufev_private->bev;
  9.     
  10.     /* 创建接收报文缓冲管理器 */
  11.     if (!bufev->input) {
  12.         if ((bufev->input = evbuffer_new()) == NULL)
  13.             return -1;
  14.     }
  15.     
  16.     /* 创建接收发送缓冲管理器 */
  17.     if (!bufev->output) {
  18.         if ((bufev->output = evbuffer_new()) == NULL) {
  19.             evbuffer_free(bufev->input);
  20.             return -1;
  21.         }
  22.     }

  23.     bufev_private->refcnt = 1;
  24.     bufev->ev_base = base;

  25.     /* Disable timeouts. */
  26.     evutil_timerclear(&bufev->timeout_read);
  27.     evutil_timerclear(&bufev->timeout_write);

  28.     bufev->be_ops = ops;

  29.     /*
  30.      * Set to EV_WRITE so that using bufferevent_write is going to
  31.      * trigger a callback. Reading needs to be explicitly enabled
  32.      * because otherwise no data will be available.
  33.      */
  34.     bufev->enabled = EV_WRITE;

  35. #ifndef _EVENT_DISABLE_THREAD_SUPPORT
  36.     if (options & BEV_OPT_THREADSAFE) {
  37.         if (bufferevent_enable_locking(bufev, NULL) < 0) {
  38.             /* cleanup */
  39.             evbuffer_free(bufev->input);
  40.             evbuffer_free(bufev->output);
  41.             bufev->input = NULL;
  42.             bufev->output = NULL;
  43.             return -1;
  44.         }
  45.     }
  46. #endif
  47.     if ((options & (BEV_OPT_DEFER_CALLBACKS|BEV_OPT_UNLOCK_CALLBACKS))
  48.      == BEV_OPT_UNLOCK_CALLBACKS) {
  49.         event_warnx("UNLOCK_CALLBACKS requires DEFER_CALLBACKS");
  50.         return -1;
  51.     }
  52.     if (options & BEV_OPT_DEFER_CALLBACKS) {
  53.         if (options & BEV_OPT_UNLOCK_CALLBACKS)
  54.             /* 初始化延迟队列的回调函数 */
  55.             event_deferred_cb_init(&bufev_private->deferred,
  56.              bufferevent_run_deferred_callbacks_unlocked,
  57.              bufev_private);
  58.         else
  59.             event_deferred_cb_init(&bufev_private->deferred,
  60.              bufferevent_run_deferred_callbacks_locked,
  61.              bufev_private);
  62.     }

  63.     bufev_private->options = options;

  64.     /* 设置两个缓存区的bufferevent,绑定了bufferent和evbuffer的关系 */
  65.     evbuffer_set_parent(bufev->input, bufev);
  66.     evbuffer_set_parent(bufev->output, bufev);

  67.     return 0;
  68. }
对于读事件的处理函数,主要完成了从socket接收报文,对应的回调函数:bufferevent_readcb(),而写事件的处理函数主要完成发送报文的操纵,对应的回调函数:bufferevent_writecb()

点击(此处)折叠或打开

  1. /* fd读事件的回调函数,也就是说实际的读操作会调用该函数 */
  2. static void
  3. bufferevent_readcb(evutil_socket_t fd, short event, void *arg)
  4. {
  5.     struct bufferevent *bufev = arg; //间事件注册函数
  6.     struct bufferevent_private *bufev_p =
  7.      EVUTIL_UPCAST(bufev, struct bufferevent_private, bev);
  8.     struct evbuffer *input;
  9.     int res = 0;
  10.     short what = BEV_EVENT_READING;
  11.     ev_ssize_t howmuch = -1, readmax=-1;

  12.     _bufferevent_incref_and_lock(bufev);

  13.     if (event == EV_TIMEOUT) {
  14.         what |= BEV_EVENT_TIMEOUT;
  15.         goto error;
  16.     }

  17.     /* 获取缓存区 */
  18.     input = bufev->input;

  19.     /*
  20.      * If we have a high watermark configured then we don't want to
  21.      * read more data than would make us reach the watermark.
  22.      */
  23.     if (bufev->wm_read.high != 0) { /* 如果当前bufferevent设置有门限 */
  24.         howmuch = bufev->wm_read.high - evbuffer_get_length(input);
  25.         /* we somehow lowered the watermark, stop reading */
  26.         if (howmuch <= 0) { /* 暂停读的操作 */
  27.             bufferevent_wm_suspend_read(bufev);
  28.             goto done;
  29.         }
  30.     }

  31.     /* 获取能够读取的最大值 */
  32.     readmax = _bufferevent_get_read_max(bufev_p);
  33.     if (howmuch < 0 || howmuch > readmax) /* The use of -1 for "unlimited"
  34.                      * uglifies this code. XXXX */
  35.         howmuch = readmax;
  36.     /* 如果当前的读操作为暂停,则不读取啦 */
  37.     if (bufev_p->read_suspended)
  38.         goto done;
  39.     
  40.     evbuffer_unfreeze(input, 0);
  41.     /* 实际是完成数据的读写操作 */
  42.     res = evbuffer_read(input, fd, (int)howmuch); /* XXXX evbuffer_read would do better to take and return ev_ssize_t */
  43.     evbuffer_freeze(input, 0);

  44.     if (res == -1) {
  45.         int err = evutil_socket_geterror(fd);
  46.         if (EVUTIL_ERR_RW_RETRIABLE(err))
  47.             goto reschedule;
  48.         /* error case */
  49.         what |= BEV_EVENT_ERROR;
  50.     } else if (res == 0) {
  51.         /* eof case */
  52.         what |= BEV_EVENT_EOF;
  53.     }

  54.     if (res <= 0)
  55.         goto error;

  56.     /* 根据当前的 */
  57.     _bufferevent_decrement_read_buckets(bufev_p, res);

  58.     /* Invoke the user callback - must always be called last */
  59.     /* 当读取的数据量大于最小值以后必须调用使用者注册的回调函数 */
  60.     if (evbuffer_get_length(input) >= bufev->wm_read.low)
  61.         _bufferevent_run_readcb(bufev);

  62.     goto done;

  63.  reschedule:
  64.     goto done;

  65.  error:
  66.     bufferevent_disable(bufev, EV_READ);
  67.     _bufferevent_run_eventcb(bufev, what);

  68.  done:
  69.      /* 该接口会根据引用计数对内存进行删除等操作 */
  70.     _bufferevent_decref_and_unlock(bufev);
  71. }

  72. static void
  73. bufferevent_writecb(evutil_socket_t fd, short event, void *arg)
  74. {
  75.     struct bufferevent *bufev = arg;
  76.     struct bufferevent_private *bufev_p =
  77.      EVUTIL_UPCAST(bufev, struct bufferevent_private, bev);
  78.     int res = 0;
  79.     short what = BEV_EVENT_WRITING;
  80.     int connected = 0;
  81.     ev_ssize_t atmost = -1;

  82.     _bufferevent_incref_and_lock(bufev);

  83.     if (event == EV_TIMEOUT) {
  84.         what |= BEV_EVENT_TIMEOUT;
  85.         goto error;
  86.     }

  87.     /* 如果正在进行链接操作 */
  88.     if (bufev_p->connecting) {/* 判断是否正在进行连接处理 */
  89.         int c = evutil_socket_finished_connecting(fd);
  90.         /* we need to fake the error if the connection was refused
  91.          * immediately - usually connection to localhost on BSD */
  92.          /* 如果拒绝连接 */
  93.         if (bufev_p->connection_refused) {
  94.          bufev_p->connection_refused = 0;
  95.          c = -1;
  96.         }

  97.         if (c == 0)
  98.             goto done;

  99.         bufev_p->connecting = 0;
  100.         if (c < 0) { /* 删除数据 */
  101.             event_del(&bufev->ev_write);
  102.             event_del(&bufev->ev_read);
  103.             _bufferevent_run_eventcb(bufev, BEV_EVENT_ERROR);
  104.             goto done;
  105.         } else { /* 连接成功或者正在连接 */
  106.             connected = 1;
  107.             _bufferevent_run_eventcb(bufev,
  108.                     BEV_EVENT_CONNECTED);
  109.             if (!(bufev->enabled & EV_WRITE) ||
  110.              bufev_p->write_suspended) {
  111.                 event_del(&bufev->ev_write);
  112.                 goto done;
  113.             }
  114.         }
  115.     }

  116.     /* 获取最大的数据量,也就是当前缓存中最大的报文量 */
  117.     atmost = _bufferevent_get_write_max(bufev_p);

  118.     if (bufev_p->write_suspended)
  119.         goto done;

  120.     /* 有数据需要发送 */
  121.     if (evbuffer_get_length(bufev->output)) {
  122.         evbuffer_unfreeze(bufev->output, 1);
  123.         /* 写数据,该函数实现报文的发送操作,尽最大能力的发送 */
  124.         res = evbuffer_write_atmost(bufev->output, fd, atmost);
  125.         evbuffer_freeze(bufev->output, 1);
  126.         if (res == -1) {
  127.             int err = evutil_socket_geterror(fd);
  128.             if (EVUTIL_ERR_RW_RETRIABLE(err))
  129.                 goto reschedule;
  130.             what |= BEV_EVENT_ERROR;
  131.         } else if (res == 0) {
  132.             /* eof case
  133.              XXXX Actually, a 0 on write doesn't indicate
  134.              an EOF. An ECONNRESET might be more typical.
  135.              */
  136.             what |= BEV_EVENT_EOF;
  137.         }
  138.         if (res <= 0)
  139.             goto error;

  140.         _bufferevent_decrement_write_buckets(bufev_p, res);
  141.     }

  142.     /* 如果没有数据需要再发送,则删除该事件 */
  143.     if (evbuffer_get_length(bufev->output) == 0) {
  144.         event_del(&bufev->ev_write);
  145.     }

  146.     /*
  147.      * Invoke the user callback if our buffer is drained or below the
  148.      * low watermark.
  149.      */
  150.      /* 当缓存再output中的报文量低于某一个值的情况下,通告用户再次进行发送操作 */
  151.     if ((res || !connected) &&
  152.      evbuffer_get_length(bufev->output) <= bufev->wm_write.low) {
  153.         _bufferevent_run_writecb(bufev);
  154.     }

  155.     goto done;

  156.  reschedule:
  157.     if (evbuffer_get_length(bufev->output) == 0) {
  158.         event_del(&bufev->ev_write);
  159.     }
  160.     goto done;

  161.  error:
  162.     bufferevent_disable(bufev, EV_WRITE);
  163.     _bufferevent_run_eventcb(bufev, what);

  164.  done: /* bufferevent的控制 */
  165.     _bufferevent_decref_and_unlock(bufev);
  166. }
接收报文的过程中实际上根据当前能够接收的报文量去接收报文,其中input是管理接收报文的缓存管理器。能够接收的报文量由管理器和对应的流控处理确定。实际的读报文函数为evbuffer_read()。

点击(此处)折叠或打开

  1. /* evbuffer的读取操作 */
  2. int
  3. evbuffer_read(struct evbuffer *buf, evutil_socket_t fd, int howmuch)
  4. {
  5.     struct evbuffer_chain **chainp;
  6.     int n;
  7.     int result;

  8. #ifdef USE_IOVEC_IMPL
  9.     int nvecs, i, remaining;
  10. #else
  11.     struct evbuffer_chain *chain;
  12.     unsigned char *p;
  13. #endif

  14.     EVBUFFER_LOCK(buf);

  15.     if (buf->freeze_end) {
  16.         result = -1;
  17.         goto done;
  18.     }
  19.     
  20.     /* 获取当前fd能够读取的最大报文量 */
  21.     n = get_n_bytes_readable_on_socket(fd);
  22.     if (n <= 0 || n > EVBUFFER_MAX_READ)
  23.         n = EVBUFFER_MAX_READ;
  24.     if (howmuch < 0 || howmuch > n) /* 读写长度 */
  25.         howmuch = n;

  26. #ifdef USE_IOVEC_IMPL
  27.     /* Since we can use iovecs, we're willing to use the last
  28.      * NUM_READ_IOVEC chains. */
  29.     if (_evbuffer_expand_fast(buf, howmuch, NUM_READ_IOVEC) == -1) {
  30.         result = -1;
  31.         goto done;
  32.     } else {
  33.         IOV_TYPE vecs[NUM_READ_IOVEC];
  34. #ifdef _EVBUFFER_IOVEC_IS_NATIVE
  35.         /* 实际是完成数据空间的预分配,即vecs空间的分配,4个vecs的空间,chainp是缓冲区的开始地址 */
  36.         nvecs = _evbuffer_read_setup_vecs(buf, howmuch, vecs,
  37.          NUM_READ_IOVEC, &chainp, 1);
  38. #else
  39.         /* We aren't using the native struct iovec. Therefore,
  40.          we are on win32. */
  41.         struct evbuffer_iovec ev_vecs[NUM_READ_IOVEC];
  42.         nvecs = _evbuffer_read_setup_vecs(buf, howmuch, ev_vecs, 2,
  43.          &chainp, 1);

  44.         for (i=0; i < nvecs; ++i)
  45.             WSABUF_FROM_EVBUFFER_IOV(&vecs[i], &ev_vecs[i]);
  46. #endif

  47.         /* 调用readv,采用多缓冲区的读写方式,linux的高级套接字,n是实际返回的长度 */
  48.         n = readv(fd, vecs, nvecs);
  49.     }

  50. #else /*!USE_IOVEC_IMPL*/
  51.     /* If we don't have FIONREAD, we might waste some space here */
  52.     /* XXX we _will_ waste some space here if there is any space left
  53.      * over on buf->last. */
  54.     if ((chain = evbuffer_expand_singlechain(buf, howmuch)) == NULL) {
  55.         result = -1;
  56.         goto done;
  57.     }

  58.     /* We can append new data at this point */
  59.     p = chain->buffer + chain->misalign + chain->off;

  60. #ifndef WIN32
  61.     n = read(fd, p, howmuch);
  62. #else
  63.     /* 常规的报文读取操作 */
  64.     n = recv(fd, p, howmuch, 0);
  65. #endif
  66. #endif /* USE_IOVEC_IMPL */

  67.     if (n == -1) {
  68.         result = -1;
  69.         goto done;
  70.     }
  71.     if (n == 0) {
  72.         result = 0;
  73.         goto done;
  74.     }

  75. #ifdef USE_IOVEC_IMPL
  76.     remaining = n;
  77.     /* nvecs是指多个缓冲区,但是不一定有那么多的数据 */
  78.     for (i=0; i < nvecs; ++i) {
  79.         /* 获取chain的长度 */
  80.         ev_ssize_t space = (ev_ssize_t) CHAIN_SPACE_LEN(*chainp);
  81.         if (space < remaining) { /* 若长度不够 */
  82.             (*chainp)->off += space; /* 则当前chain的内存使用完毕 */
  83.             remaining -= (int)space; /* 剩下的内存空间 */
  84.         } else {
  85.             (*chainp)->off += remaining; /* 当前空间已经足够 */
  86.             buf->last_with_datap = chainp;
  87.             break;
  88.         }
  89.         chainp = &(*chainp)->next;
  90.     }
  91. #else
  92.     chain->off += n;
  93.     advance_last_with_data(buf);
  94. #endif
  95.     /* 更新当前实际的有效长度 */
  96.     buf->total_len += n;
  97.     buf->n_add_for_cb += n;

  98.     /* Tell someone about changes in this buffer */
  99.     evbuffer_invoke_callbacks(buf);
  100.     result = n;
  101. done:
  102.     EVBUFFER_UNLOCK(buf);
  103.     return result;
  104. }
读处理过程中还实现了类似流控的机制,当接受到的报文大于了当前设置的最小门限后,会通告用户进行报文的读取操作:

点击(此处)折叠或打开

  1. void
  2. _bufferevent_run_readcb(struct bufferevent *bufev)
  3. {
  4.     /* Requires that we hold the lock and a reference */
  5.     struct bufferevent_private *p =
  6.      EVUTIL_UPCAST(bufev, struct bufferevent_private, bev);
  7.     if (bufev->readcb == NULL)
  8.         return;
  9.     if (p->options & BEV_OPT_DEFER_CALLBACKS) {
  10.         p->readcb_pending = 1;
  11.         if (!p->deferred.queued)
  12.             SCHEDULE_DEFERRED(p);
  13.     } else {
  14.         /* 调用对应的读回调函数,对应的对调函数为bufferevent中注册的用户接口 */
  15.         bufev->readcb(bufev, bufev->cbarg);
  16.     }
  17. }
在发送报文的过程中通过将函数evbuffer_write_atmost()实现,该函数会尽可能的发送对应的报文:

点击(此处)折叠或打开

  1. /* 尽最大可能的发送,通常会采用ivec或者sendfile */
  2. int
  3. evbuffer_write_atmost(struct evbuffer *buffer, evutil_socket_t fd,
  4.     ev_ssize_t howmuch)
  5. {
  6.     int n = -1;

  7.     EVBUFFER_LOCK(buffer);

  8.     if (buffer->freeze_start) {
  9.         goto done;
  10.     }

  11.     /* 发送长度最大为当前buffer的长度 */
  12.     if (howmuch < 0 || (size_t)howmuch > buffer->total_len)
  13.         howmuch = buffer->total_len;

  14.     if (howmuch > 0) {
  15. #ifdef USE_SENDFILE /* 零拷贝的报文发送处理 */
  16.         struct evbuffer_chain *chain = buffer->first;
  17.         if (chain != NULL && (chain->flags & EVBUFFER_SENDFILE))
  18.             n = evbuffer_write_sendfile(buffer, fd, howmuch);
  19.         else {
  20. #endif
  21. #ifdef USE_IOVEC_IMPL
  22.         n = evbuffer_write_iovec(buffer, fd, howmuch);  /* 多缓冲区的数据发送操作,也能提高发送性能 */
  23. #elif defined(WIN32)
  24.         /* XXX(nickm) Don't disable this code until we know if
  25.          * the WSARecv code above works. */
  26.         void *p = evbuffer_pullup(buffer, howmuch);
  27.         n = send(fd, p, howmuch, 0);
  28. #else
  29.         /* 普通的报文发送处理 */
  30.         void *p = evbuffer_pullup(buffer, howmuch);
  31.         n = write(fd, p, howmuch);
  32. #endif
  33. #ifdef USE_SENDFILE
  34.         }
  35. #endif
  36.     }

  37.     if (n > 0) /* 报文发送完成之后将evbuffer丢弃 */
  38.         evbuffer_drain(buffer, n);

  39. done:
  40.     EVBUFFER_UNLOCK(buffer);
  41.     return (n);
  42. }
从上述的过程中可以发现,在发送报文的流程中存在多种方式,其中主要是零拷贝的发送、多缓冲区发送、普通的发送,在报文发送完成之后将对应的缓存丢弃。使用这的报文处理实际上是通过回调的通告方式,当当前缓存中的报文量少于一定量的情况下就会回调注册的写函数进行报文的发送(使用者将报文写到缓存中的过程)。具体的如下所示:

点击(此处)折叠或打开

  1. /* 调用用户的发送回调函数 */
  2. void
  3. _bufferevent_run_writecb(struct bufferevent *bufev)
  4. {
  5.     /* Requires that we hold the lock and a reference */
  6.     struct bufferevent_private *p =
  7.      EVUTIL_UPCAST(bufev, struct bufferevent_private, bev);
  8.     if (bufev->writecb == NULL)
  9.         return;
  10.     if (p->options & BEV_OPT_DEFER_CALLBACKS) {
  11.         p->writecb_pending = 1;
  12.         if (!p->deferred.queued)
  13.             SCHEDULE_DEFERRED(p);
  14.     } else {
  15.         /* 调用写的回调函数,在该函数中用户可进行报文的发送处理 */
  16.         bufev->writecb(bufev, bufev->cbarg);
  17.     }
  18. }
以上的过程基本就完成了报文的接收和发送操作。

该系列的分析都是基于libevent2.0.20.

阅读(5489) | 评论(0) | 转发(0) |
给主人留下些什么吧!~~