网络编程中收发包处理是非常重要的过程,网络通信的本质就是报文的收发,在报文收发的基础上进行相关逻辑的处理和扩展。因此在libevent中也有报文收发处理相关的实现,在libevent中报文的发送和接收支持多缓冲区以及零拷贝相关的技术,本文就libevent的收发包处理进行分析。
在网络编程中需要对数据进行一定的管理和设计,在libevent中提供了一系列的数据结构:
struct
evbuffer_chain:该结构体主要用于抽象一块内存空间。采用链表的方式进行管理,其中有该内存大小的相关字段。
struct
evbuffer:该结构体主要用于抽象内存空间的管理器,其中指出了内存块的多少等重要信息,另外包含了相关信息,用于标识该管理归属的socket。在网络编程中收发包处理都是基于socket,
struct
bufferevent:该结构体中主要是对某一个socket的收发包队列的抽象,其中包含了接收队列和发送队列、事件,每个socket可以包含一个读事件和一个写事件以及对应的回调函数。
-
/* 缓存块*/
-
/** A single item in an evbuffer. */
-
struct evbuffer_chain {
-
/** points to next buffer in the chain */
-
/* 链表的实现 */
-
struct evbuffer_chain *next;
-
-
/** total allocation available in the buffer field. */
-
size_t buffer_len;
-
-
/** unused space at the beginning of buffer or an offset into a
-
* file for sendfile buffers. */
-
ev_off_t misalign;
-
-
/** Offset into buffer + misalign at which to start writing.
-
* In other words, the total number of bytes actually stored
-
* in buffer. */
-
size_t off;
-
-
/** Set if special handling is required for this chain */
-
unsigned flags;
-
...
-
/* 实际的数据区 */
-
unsigned char *buffer;
-
};
-
-
struct evbuffer {
-
/** The first chain in this buffer's linked list of chains. */
-
/* 指向第一个chain */
-
struct evbuffer_chain *first;
-
/** The last chain in this buffer's linked list of chains. */
-
struct evbuffer_chain *last;
-
-
/** Pointer to the next pointer pointing at the 'last_with_data' chain.
-
*
-
* To unpack:
-
*
-
* The last_with_data chain is the last chain that has any data in it.
-
* If all chains in the buffer are empty, it is the first chain.
-
* If the buffer has no chains, it is NULL.
-
*
-
* The last_with_datap pointer points at _whatever 'next' pointer_
-
* points at the last_with_datap chain. If the last_with_data chain
-
* is the first chain, or it is NULL, then the last_with_datap pointer
-
* is &buf->first.
-
*/
-
/* 指向最后一个 */
-
struct evbuffer_chain **last_with_datap;
-
-
/** Total amount of bytes stored in all chains.*/
-
size_t total_len;
-
-
/* 最后一次调用只有添加的字节和删除的字节 */
-
/** Number of bytes we have added to the buffer since we last tried to
-
* invoke callbacks. */
-
size_t n_add_for_cb;
-
/** Number of bytes we have removed from the buffer since we last
-
* tried to invoke callbacks. */
-
size_t n_del_for_cb;
-
...
-
/* 延迟队列 */
-
/** Used to implement deferred callbacks. */
-
struct deferred_cb_queue *cb_queue;
-
-
/** A reference count on this evbuffer. When the reference count
-
* reaches 0, the buffer is destroyed. Manipulated with
-
* evbuffer_incref and evbuffer_decref_and_unlock and
-
* evbuffer_free. */
-
int refcnt;
-
-
/** A deferred_cb handle to make all of this buffer's callbacks
-
* invoked from the event loop. */
-
/* 延迟回调块 */
-
struct deferred_cb deferred;
-
-
/* 延迟队列回调函数链表 */
-
/** A doubly-linked-list of callback functions */
-
TAILQ_HEAD(evbuffer_cb_queue, evbuffer_cb_entry) callbacks;
-
-
/** The parent bufferevent object this evbuffer belongs to.
-
* NULL if the evbuffer stands alone. */
-
/* 归属的bufferevent */
-
struct bufferevent *parent;
-
};
-
-
/* buffer事件 */
-
struct bufferevent {
-
/** Event base for which this bufferevent was created. */
-
struct event_base *ev_base;
-
/** Pointer to a table of function pointers to set up how this
-
bufferevent behaves. */
-
/* 设置bufferevent的行为的操作函数 */
-
const struct bufferevent_ops *be_ops;
-
-
/** A read event that triggers when a timeout has happened or a socket
-
is ready to read data. Only used by some subtypes of
-
bufferevent. */
-
/* 读事件 */
-
struct event ev_read;
-
/** A write event that triggers when a timeout has happened or a socket
-
is ready to write data. Only used by some subtypes of
-
bufferevent. */
-
/* 写事件 */
-
struct event ev_write;
-
-
/** An input buffer. Only the bufferevent is allowed to add data to
-
this buffer, though the user is allowed to drain it. */
-
/* 对应的接收缓存 */
-
struct evbuffer *input;
-
-
/** An input buffer. Only the bufferevent is allowed to drain data
-
from this buffer, though the user is allowed to add it. */
-
/* 发送缓存 */
-
struct evbuffer *output;
-
-
/* 用于读写限速控制的结构体 */
-
struct event_watermark wm_read;
-
struct event_watermark wm_write;
-
-
/* 读回调函数 */
-
bufferevent_data_cb readcb;
-
/* 写回调函数 */
-
bufferevent_data_cb writecb;
-
/* This should be called 'eventcb', but renaming it would break
-
void *cbarg;
-
...
-
}
代码实现过程:
接下来主要是fd与bufferevent的绑定过程:
-
/* 针对fd的bufferevent */
-
struct bufferevent *
-
bufferevent_socket_new(struct event_base
*base,
evutil_socket_t fd,
-
int options)
-
{
-
struct bufferevent_private *bufev_p;
-
struct bufferevent *bufev;
-
-
if ((bufev_p
= mm_calloc(1,
sizeof(struct bufferevent_private)))== NULL)
-
return NULL;
-
-
if (bufferevent_init_common(bufev_p,
base, &bufferevent_ops_socket,
-
options) < 0) {
-
mm_free(bufev_p);
-
return NULL;
-
}
-
bufev = &bufev_p->bev;
-
evbuffer_set_flags(bufev->output,
EVBUFFER_FLAG_DRAINS_TO_FD);
-
-
/*
添加基于fd的读事件,对应的回调函数为bufferevent_readcb,参数为bufferent */
-
event_assign(&bufev->ev_read,
bufev->ev_base,
fd,
-
EV_READ|EV_PERSIST, bufferevent_readcb, bufev);
-
/*
添加基于fd的写事件,对应的回调函数为bufferevent_writecb,参数为bufferent */
-
event_assign(&bufev->ev_write,
bufev->ev_base,
fd,
-
EV_WRITE|EV_PERSIST, bufferevent_writecb, bufev);
-
-
/*
添加evbuffer的回调函数为bufferevent_socket_outbuf_cb,主要针对发送数据的 */
-
evbuffer_add_cb(bufev->output,
bufferevent_socket_outbuf_cb, bufev);
-
-
/* 允许接收数据 */
-
evbuffer_freeze(bufev->input,
0);
-
/* 冻结发送数据的缓存区 */
-
evbuffer_freeze(bufev->output,
1);
-
-
return bufev;
-
}
通用bufferevent初始化函数,主要完成了缓存管理器的创建,也就是接收缓存管理器和发送缓存管理器两个缓存的创建和bufferevent的绑定关系,该初始化是通用接口,不止是针对socket有效。
-
/* bufferevent的初始化接口,不同的bufferevent通用的接口 */
-
int
-
bufferevent_init_common(struct
bufferevent_private *bufev_private,
-
struct event_base *base,
-
const struct bufferevent_ops *ops,
-
enum bufferevent_options options)
-
{
-
struct bufferevent *bufev = &bufev_private->bev;
-
-
/* 创建接收报文缓冲管理器 */
-
if (!bufev->input)
{
-
if ((bufev->input
= evbuffer_new()) == NULL)
-
return -1;
-
}
-
-
/* 创建接收发送缓冲管理器 */
-
if (!bufev->output)
{
-
if ((bufev->output
= evbuffer_new()) == NULL) {
-
evbuffer_free(bufev->input);
-
return -1;
-
}
-
}
-
-
bufev_private->refcnt =
1;
-
bufev->ev_base =
base;
-
-
/* Disable timeouts. */
-
evutil_timerclear(&bufev->timeout_read);
-
evutil_timerclear(&bufev->timeout_write);
-
-
bufev->be_ops =
ops;
-
-
/*
-
* Set to
EV_WRITE so that using bufferevent_write is
going to
-
* trigger a callback. Reading needs to be explicitly enabled
-
* because otherwise no data will be
available.
-
*/
-
bufev->enabled =
EV_WRITE;
-
-
#ifndef _EVENT_DISABLE_THREAD_SUPPORT
-
if (options &
BEV_OPT_THREADSAFE) {
-
if (bufferevent_enable_locking(bufev, NULL) < 0) {
-
/* cleanup */
-
evbuffer_free(bufev->input);
-
evbuffer_free(bufev->output);
-
bufev->input =
NULL;
-
bufev->output =
NULL;
-
return -1;
-
}
-
}
-
#endif
-
if ((options
& (BEV_OPT_DEFER_CALLBACKS|BEV_OPT_UNLOCK_CALLBACKS))
-
== BEV_OPT_UNLOCK_CALLBACKS) {
-
event_warnx("UNLOCK_CALLBACKS requires DEFER_CALLBACKS");
-
return -1;
-
}
-
if (options &
BEV_OPT_DEFER_CALLBACKS) {
-
if (options &
BEV_OPT_UNLOCK_CALLBACKS)
-
/* 初始化延迟队列的回调函数 */
-
event_deferred_cb_init(&bufev_private->deferred,
-
bufferevent_run_deferred_callbacks_unlocked,
-
bufev_private);
-
else
-
event_deferred_cb_init(&bufev_private->deferred,
-
bufferevent_run_deferred_callbacks_locked,
-
bufev_private);
-
}
-
-
bufev_private->options =
options;
-
-
/* 设置两个缓存区的bufferevent,绑定了bufferent和evbuffer的关系
*/
-
evbuffer_set_parent(bufev->input,
bufev);
-
evbuffer_set_parent(bufev->output,
bufev);
-
-
return 0;
-
}
对于
读事件的处理函数,主要完成了从socket接收报文,对应的回调函数:bufferevent_readcb(),而写事件的处理函数主要完成发送报文的操纵,对应的回调函数
:bufferevent_writecb()。
-
/* fd读事件的回调函数,也就是说实际的读操作会调用该函数 */
-
static void
-
bufferevent_readcb(evutil_socket_t
fd, short event, void *arg)
-
{
-
struct bufferevent *bufev = arg; //间事件注册函数
-
struct bufferevent_private *bufev_p
=
-
EVUTIL_UPCAST(bufev, struct bufferevent_private, bev);
-
struct evbuffer *input;
-
int res = 0;
-
short what = BEV_EVENT_READING;
-
ev_ssize_t howmuch = -1,
readmax=-1;
-
-
_bufferevent_incref_and_lock(bufev);
-
-
if (event == EV_TIMEOUT)
{
-
what |= BEV_EVENT_TIMEOUT;
-
goto error;
-
}
-
-
/* 获取缓存区 */
-
input = bufev->input;
-
-
/*
-
* If we have a high watermark configured then we don't
want to
-
* read more data than would make us
reach the watermark.
-
*/
-
if (bufev->wm_read.high != 0) { /* 如果当前bufferevent设置有门限 */
-
howmuch = bufev->wm_read.high -
evbuffer_get_length(input);
-
/* we somehow lowered the watermark, stop reading */
-
if (howmuch <= 0) { /* 暂停读的操作
*/
-
bufferevent_wm_suspend_read(bufev);
-
goto done;
-
}
-
}
-
-
/* 获取能够读取的最大值 */
-
readmax =
_bufferevent_get_read_max(bufev_p);
-
if (howmuch <
0 ||
howmuch > readmax) /* The use of -1 for "unlimited"
-
* uglifies this
code. XXXX */
-
howmuch = readmax;
-
/* 如果当前的读操作为暂停,则不读取啦 */
-
if (bufev_p->read_suspended)
-
goto done;
-
-
evbuffer_unfreeze(input, 0);
-
/* 实际是完成数据的读写操作 */
-
res = evbuffer_read(input,
fd, (int)howmuch); /* XXXX evbuffer_read would do better to
take and return ev_ssize_t */
-
evbuffer_freeze(input, 0);
-
-
if (res == -1) {
-
int err =
evutil_socket_geterror(fd);
-
if (EVUTIL_ERR_RW_RETRIABLE(err))
-
goto reschedule;
-
/* error case */
-
what |= BEV_EVENT_ERROR;
-
} else if (res == 0) {
-
/* eof case
*/
-
what |= BEV_EVENT_EOF;
-
}
-
-
if (res <= 0)
-
goto error;
-
-
/* 根据当前的 */
-
_bufferevent_decrement_read_buckets(bufev_p,
res);
-
-
/* Invoke the user callback - must always be called last */
-
/* 当读取的数据量大于最小值以后必须调用使用者注册的回调函数 */
-
if (evbuffer_get_length(input) >=
bufev->wm_read.low)
-
_bufferevent_run_readcb(bufev);
-
-
goto done;
-
-
reschedule:
-
goto done;
-
-
error:
-
bufferevent_disable(bufev, EV_READ);
-
_bufferevent_run_eventcb(bufev, what);
-
-
done:
-
/* 该接口会根据引用计数对内存进行删除等操作 */
-
_bufferevent_decref_and_unlock(bufev);
-
}
-
-
static void
-
bufferevent_writecb(evutil_socket_t
fd, short event, void *arg)
-
{
-
struct bufferevent *bufev = arg;
-
struct bufferevent_private *bufev_p
=
-
EVUTIL_UPCAST(bufev, struct bufferevent_private, bev);
-
int res = 0;
-
short what = BEV_EVENT_WRITING;
-
int connected = 0;
-
ev_ssize_t atmost = -1;
-
-
_bufferevent_incref_and_lock(bufev);
-
-
if (event == EV_TIMEOUT)
{
-
what |= BEV_EVENT_TIMEOUT;
-
goto error;
-
}
-
-
/* 如果正在进行链接操作 */
-
if (bufev_p->connecting) {/* 判断是否正在进行连接处理 */
-
int c = evutil_socket_finished_connecting(fd);
-
/* we need to
fake the error if the connection was refused
-
* immediately - usually connection to localhost on BSD */
-
/* 如果拒绝连接 */
-
if (bufev_p->connection_refused) {
-
bufev_p->connection_refused = 0;
-
c = -1;
-
}
-
-
if (c == 0)
-
goto done;
-
-
bufev_p->connecting = 0;
-
if (c < 0) { /* 删除数据 */
-
event_del(&bufev->ev_write);
-
event_del(&bufev->ev_read);
-
_bufferevent_run_eventcb(bufev,
BEV_EVENT_ERROR);
-
goto done;
-
} else { /* 连接成功或者正在连接
*/
-
connected = 1;
-
_bufferevent_run_eventcb(bufev,
-
BEV_EVENT_CONNECTED);
-
if (!(bufev->enabled & EV_WRITE) ||
-
bufev_p->write_suspended) {
-
event_del(&bufev->ev_write);
-
goto done;
-
}
-
}
-
}
-
-
/* 获取最大的数据量,也就是当前缓存中最大的报文量 */
-
atmost =
_bufferevent_get_write_max(bufev_p);
-
-
if (bufev_p->write_suspended)
-
goto done;
-
-
/* 有数据需要发送 */
-
if (evbuffer_get_length(bufev->output)) {
-
evbuffer_unfreeze(bufev->output,
1);
-
/* 写数据,该函数实现报文的发送操作,尽最大能力的发送 */
-
res = evbuffer_write_atmost(bufev->output,
fd, atmost);
-
evbuffer_freeze(bufev->output,
1);
-
if (res == -1) {
-
int err =
evutil_socket_geterror(fd);
-
if (EVUTIL_ERR_RW_RETRIABLE(err))
-
goto reschedule;
-
what |= BEV_EVENT_ERROR;
-
} else if (res == 0) {
-
/* eof case
-
XXXX Actually, a 0 on write doesn't indicate
-
an EOF. An ECONNRESET might
be more typical.
-
*/
-
what |= BEV_EVENT_EOF;
-
}
-
if (res <= 0)
-
goto error;
-
-
_bufferevent_decrement_write_buckets(bufev_p,
res);
-
}
-
-
/* 如果没有数据需要再发送,则删除该事件 */
-
if (evbuffer_get_length(bufev->output)
==
0) {
-
event_del(&bufev->ev_write);
-
}
-
-
/*
-
* Invoke the user callback if our buffer is drained or
below the
-
* low watermark.
-
*/
-
/* 当缓存再output中的报文量低于某一个值的情况下,通告用户再次进行发送操作 */
-
if ((res || !connected)
&&
-
evbuffer_get_length(bufev->output)
<=
bufev->wm_write.low) {
-
_bufferevent_run_writecb(bufev);
-
}
-
-
goto done;
-
-
reschedule:
-
if (evbuffer_get_length(bufev->output)
==
0) {
-
event_del(&bufev->ev_write);
-
}
-
goto done;
-
-
error:
-
bufferevent_disable(bufev, EV_WRITE);
-
_bufferevent_run_eventcb(bufev, what);
-
-
done: /*
bufferevent的控制 */
-
_bufferevent_decref_and_unlock(bufev);
-
}
接收报文的过程中实际上根据当前能够接收的报文量去接收报文,其中input是管理接收报文的缓存管理器。能够接收的报文量由管理器和对应的流控处理确定。实际的读报文函数为evbuffer_read()。
-
/* evbuffer的读取操作 */
-
int
-
evbuffer_read(struct evbuffer *buf, evutil_socket_t fd, int howmuch)
-
{
-
struct evbuffer_chain **chainp;
-
int n;
-
int result;
-
-
#ifdef USE_IOVEC_IMPL
-
int nvecs, i, remaining;
-
#else
-
struct evbuffer_chain *chain;
-
unsigned char *p;
-
#endif
-
-
EVBUFFER_LOCK(buf);
-
-
if (buf->freeze_end) {
-
result = -1;
-
goto done;
-
}
-
-
/* 获取当前fd能够读取的最大报文量 */
-
n = get_n_bytes_readable_on_socket(fd);
-
if (n <= 0 || n > EVBUFFER_MAX_READ)
-
n = EVBUFFER_MAX_READ;
-
if (howmuch < 0 || howmuch > n) /* 读写长度 */
-
howmuch = n;
-
-
#ifdef USE_IOVEC_IMPL
-
/* Since we can use iovecs, we're willing to use the last
-
* NUM_READ_IOVEC chains. */
-
if (_evbuffer_expand_fast(buf, howmuch, NUM_READ_IOVEC) == -1) {
-
result = -1;
-
goto done;
-
} else {
-
IOV_TYPE vecs[NUM_READ_IOVEC];
-
#ifdef _EVBUFFER_IOVEC_IS_NATIVE
-
/* 实际是完成数据空间的预分配,即vecs空间的分配,4个vecs的空间,chainp是缓冲区的开始地址 */
-
nvecs = _evbuffer_read_setup_vecs(buf, howmuch, vecs,
-
NUM_READ_IOVEC, &chainp, 1);
-
#else
-
/* We aren't using the native struct iovec. Therefore,
-
we are on win32. */
-
struct evbuffer_iovec ev_vecs[NUM_READ_IOVEC];
-
nvecs = _evbuffer_read_setup_vecs(buf, howmuch, ev_vecs, 2,
-
&chainp, 1);
-
-
for (i=0; i < nvecs; ++i)
-
WSABUF_FROM_EVBUFFER_IOV(&vecs[i], &ev_vecs[i]);
-
#endif
-
-
/* 调用readv,采用多缓冲区的读写方式,linux的高级套接字,n是实际返回的长度 */
-
n = readv(fd, vecs, nvecs);
-
}
-
-
#else /*!USE_IOVEC_IMPL*/
-
/* If we don't have FIONREAD, we might waste some space here */
-
/* XXX we _will_ waste some space here if there is any space left
-
* over on buf->last. */
-
if ((chain = evbuffer_expand_singlechain(buf, howmuch)) == NULL) {
-
result = -1;
-
goto done;
-
}
-
-
/* We can append new data at this point */
-
p = chain->buffer + chain->misalign + chain->off;
-
-
#ifndef WIN32
-
n = read(fd, p, howmuch);
-
#else
-
/* 常规的报文读取操作 */
-
n = recv(fd, p, howmuch, 0);
-
#endif
-
#endif /* USE_IOVEC_IMPL */
-
-
if (n == -1) {
-
result = -1;
-
goto done;
-
}
-
if (n == 0) {
-
result = 0;
-
goto done;
-
}
-
-
#ifdef USE_IOVEC_IMPL
-
remaining = n;
-
/* nvecs是指多个缓冲区,但是不一定有那么多的数据 */
-
for (i=0; i < nvecs; ++i) {
-
/* 获取chain的长度 */
-
ev_ssize_t space = (ev_ssize_t) CHAIN_SPACE_LEN(*chainp);
-
if (space < remaining) { /* 若长度不够 */
-
(*chainp)->off += space; /* 则当前chain的内存使用完毕 */
-
remaining -= (int)space; /* 剩下的内存空间 */
-
} else {
-
(*chainp)->off += remaining; /* 当前空间已经足够 */
-
buf->last_with_datap = chainp;
-
break;
-
}
-
chainp = &(*chainp)->next;
-
}
-
#else
-
chain->off += n;
-
advance_last_with_data(buf);
-
#endif
-
/* 更新当前实际的有效长度 */
-
buf->total_len += n;
-
buf->n_add_for_cb += n;
-
-
/* Tell someone about changes in this buffer */
-
evbuffer_invoke_callbacks(buf);
-
result = n;
-
done:
-
EVBUFFER_UNLOCK(buf);
-
return result;
-
}
读处理过程中还实现了类似流控的机制,当接受到的报文大于了当前设置的最小门限后,会通告用户进行报文的读取操作:
-
void
-
_bufferevent_run_readcb(struct bufferevent *bufev)
-
{
-
/* Requires that we hold the lock and a reference */
-
struct bufferevent_private *p =
-
EVUTIL_UPCAST(bufev, struct bufferevent_private, bev);
-
if (bufev->readcb == NULL)
-
return;
-
if (p->options & BEV_OPT_DEFER_CALLBACKS) {
-
p->readcb_pending = 1;
-
if (!p->deferred.queued)
-
SCHEDULE_DEFERRED(p);
-
} else {
-
/* 调用对应的读回调函数,对应的对调函数为bufferevent中注册的用户接口 */
-
bufev->readcb(bufev, bufev->cbarg);
-
}
-
}
在发送报文的过程中通过将函数evbuffer_write_atmost()实现,该函数会尽可能的发送对应的报文:
-
/* 尽最大可能的发送,通常会采用ivec或者sendfile */
-
int
-
evbuffer_write_atmost(struct evbuffer *buffer, evutil_socket_t fd,
-
ev_ssize_t howmuch)
-
{
-
int n = -1;
-
-
EVBUFFER_LOCK(buffer);
-
-
if (buffer->freeze_start) {
-
goto done;
-
}
-
-
/* 发送长度最大为当前buffer的长度 */
-
if (howmuch < 0 || (size_t)howmuch > buffer->total_len)
-
howmuch = buffer->total_len;
-
-
if (howmuch > 0) {
-
#ifdef USE_SENDFILE /* 零拷贝的报文发送处理 */
-
struct evbuffer_chain *chain = buffer->first;
-
if (chain != NULL && (chain->flags & EVBUFFER_SENDFILE))
-
n = evbuffer_write_sendfile(buffer, fd, howmuch);
-
else {
-
#endif
-
#ifdef USE_IOVEC_IMPL
-
n = evbuffer_write_iovec(buffer, fd, howmuch); /* 多缓冲区的数据发送操作,也能提高发送性能 */
-
#elif defined(WIN32)
-
/* XXX(nickm) Don't disable this code until we know if
-
* the WSARecv code above works. */
-
void *p = evbuffer_pullup(buffer, howmuch);
-
n = send(fd, p, howmuch, 0);
-
#else
-
/* 普通的报文发送处理 */
-
void *p = evbuffer_pullup(buffer, howmuch);
-
n = write(fd, p, howmuch);
-
#endif
-
#ifdef USE_SENDFILE
-
}
-
#endif
-
}
-
-
if (n > 0) /* 报文发送完成之后将evbuffer丢弃 */
-
evbuffer_drain(buffer, n);
-
-
done:
-
EVBUFFER_UNLOCK(buffer);
-
return (n);
-
}
从上述的过程中可以发现,在发送报文的流程中存在多种方式,其中主要是零拷贝的发送、多缓冲区发送、普通的发送,在报文发送完成之后将对应的缓存丢弃。使用这的报文处理实际上是通过回调的通告方式,当当前缓存中的报文量少于一定量的情况下就会回调注册的写函数进行报文的发送(使用者将报文写到缓存中的过程)。具体的如下所示:
-
/* 调用用户的发送回调函数 */
-
void
-
_bufferevent_run_writecb(struct bufferevent *bufev)
-
{
-
/* Requires that we hold the lock and a reference */
-
struct bufferevent_private *p =
-
EVUTIL_UPCAST(bufev, struct bufferevent_private, bev);
-
if (bufev->writecb == NULL)
-
return;
-
if (p->options & BEV_OPT_DEFER_CALLBACKS) {
-
p->writecb_pending = 1;
-
if (!p->deferred.queued)
-
SCHEDULE_DEFERRED(p);
-
} else {
-
/* 调用写的回调函数,在该函数中用户可进行报文的发送处理 */
-
bufev->writecb(bufev, bufev->cbarg);
-
}
-
}
以上的过程基本就完成了报文的接收和发送操作。
该系列的分析都是基于libevent2.0.20.
阅读(5582) | 评论(0) | 转发(0) |