Libevent源码在下载,本系列文章以libevent-2.0.20-stable为例。
一、原理
1、Reactor模式
由于整个Libevent就是一个Reactor,在此不得不先墨迹一下Reactor模型。
Reactor模型是一种事件驱动机制,所有事情都当做事件来处理(例如:Libevent的I/O、定时和信号),应用为事件注册一个回调函数到Reactor上,当事件发生时Reactor调用注册的回调函数以处理相应的事件,就这么简单。
2、关键数据结构
在介绍Libevent事件处理框架之前先介绍下关键的数据结构:
(1)event_base
event_base定义在Event_internal.h/170行,该结构的开头有连个非常重要的字段,分别是:
struct event_base {
const struct eventop *evsel;
void *evbase;
};
Libevent是由C语言编写的,没有C++中类和实例的东东,这里evsel和evbase就是模拟C++中的这种行为,evsel和evbase就好比类和静态函数的关系,比如添加事件时:evsel->add(evbase, ev)相当于class::add(instance, ev),instance就是class的一个实例即evbase之于evsel。
evsel是一个结构体指针,指向全局变量static const struct eventop *eventops[]中的一个。
eventops是何许人也后文书会解说,先卖个关子。
(2)event
Libevent基于事件驱动的,event也就当仁不让地成为整个库的核心了,类似于DOTA中的小黑、骷髅、火枪、月骑之类的
。event提供了回调函数接口供Reactor在事件发生时调用,以处理相应事件,通常其会绑定一个有效的句柄。event定义在Event_struct.h/87行,具体如下:
struct event {
TAILQ_ENTRY(event) ev_active_next;
TAILQ_ENTRY(event) ev_next;
/* for managing timeouts */
union {
TAILQ_ENTRY(event) ev_next_with_common_timeout;
int min_heap_idx;
} ev_timeout_pos;
evutil_socket_t ev_fd;
struct event_base *ev_base;
union {
/* used for io events */
struct {
TAILQ_ENTRY(event) ev_io_next;
struct timeval ev_timeout;
} ev_io;
/* used by signal events */
struct {
TAILQ_ENTRY(event) ev_signal_next;
short ev_ncalls;
/* Allows deletes in callback */
short *ev_pncalls;
} ev_signal;
} _ev;
short ev_events;
short ev_res; /* result passed to event callback */
short ev_flags;
ev_uint8_t ev_pri; /* smaller numbers are higher priority */
ev_uint8_t ev_closure;
struct timeval ev_timeout;
/* allows us to adopt for different types of events */
void (*ev_callback)(evutil_socket_t, short, void *arg);
void *ev_arg;
};
从上面源码不难看出,2个union占据了结构体篇幅的半壁江山,其分别代表了Libevent的三驾马车(I/O、定时和信号)。
Libevent用双向链表来存储事件列表,其中,ev_next表示注册的事件,ev_active_next表示激活的事件。
ev_events表示注册的事件类型,其具体值在Event.h/731行,主要如下:
#define EV_TIMEOUT 0x01 // 定时事件
#define EV_READ 0x02 // I/O读事件
#define EV_WRITE 0x04 // I/O写事件
#define EV_SIGNAL 0x08 // 信号事件
#define EV_PERSIST 0x10 // 永久事件
#define EV_ET 0x20 // 边缘触发
上面事件类型可以通过或|运算符来进行组合,以满足自己的关注需求,需要注意的是,信号与I/O事件不能同时设置。
ev_base表示event所属的Reactor实例,memcached就是应用多个实例的典型用例,以后在解析memcached源码时会讲解。
对于I/O事件,ev_fd表示event绑定的文件描述符即socket,对于signal事件,ev_fd表示event绑定的信号。
ev_arg可以是任意数据类型,用于为回调函数绑定event,由于其是void *,因此,可以对其进行扩展,就像用epoll或者IOCP时绑定的数据一样可以带上context。
ev_callback就是为event绑定的回调函数,在其注册的事件触发时,ev_base会调用这个函数,以执行相应的事件处理程序,其原型见上面的event结构体,补全参数如下:
void (*ev_callback)(evutil_socket_t fd, short events, void *arg);
其中,参数fd对应于ev_fd,events对应于ev_events,arg对应于ev_arg。
(3)eventop
eventop实际上是前面提到过的event_base中重要字段evbase的一个实例对象,其定义在Event-internal.h/60行,具体如下:
struct eventop {
const char *name; // I/O多路复用机制名
void *(*init)(struct event_base *); // 初始化event_base
int (*add)(struct event_base *, evutil_socket_t fd, short old, short events, void *fdinfo); // 注册事件
int (*del)(struct event_base *, evutil_socket_t fd, short old, short events, void *fdinfo); // 删除事件
int (*dispatch)(struct event_base *, struct timeval *); // 事件分发
void (*dealloc)(struct event_base *); // 释放event_base
int need_reinit;
enum event_method_feature features;
size_t fdinfo_len;
};
eventop的主干是一系列的函数指针,包括初始化和释放evnt_base,注册、删除和分发事件等5个接口函数,所有I/O多路复用机制(Epoll、Select、Poll、KQueue、DevPoll等)都必须实现这5个接口函数,并在初始化的时候将eventop的这5个接口函数指针指向比如Epoll实现的5个函数,以eopll为例见Epoll.c/84行:
const struct eventop epollops = {
"epoll",
epoll_init, // Epoll.c/108行
epoll_nochangelist_add, // Epoll.c/352行
epoll_nochangelist_del, // Epoll.c/370行
epoll_dispatch, // Epoll.c/386行
epoll_dealloc, // Epoll.c/461行
1, /* need reinit */
EV_FEATURE_ET|EV_FEATURE_O1,
0
};
二、流程
支持Libevent运转的就是一个大循环,这个主循环体现在event_base_loop(Event.c/1533)函数里,该函数的执行流程如下:
图1 event_base_loop主循环
上图的简单描述就是:
(1)校正系统当前时间。
(2)将当前时间与存放时间的最小堆中的时间依次进行比较,将所有时间小于当前时间的定时器事件从堆中取出来加入到活动事件队列中。
(3)调用I/O封装(比如:Epoll)的事件分发函数dispatch函数,以当前时间与时间堆中的最小值之间的差值(最小堆取最小值复杂度为O(1))作为Epoll/epoll_wait(Epoll.c/dispatch/407)的timeout值,在其中将触发的I/O和信号事件加入到活动事件队列中。
(4)调用函数event_process_active(Event.c/1406)遍历活动事件队列,依次调用注册的回调函数处理相应事件。
最后附上event_base_loop源码如下:
int event_base_loop(struct event_base *base, int flags)
{
const struct eventop *evsel = base->evsel;
struct timeval tv;
struct timeval *tv_p;
int res, done, retval = 0;
/* Grab the lock. We will release it inside evsel.dispatch, and again
* as we invoke user callbacks. */
EVBASE_ACQUIRE_LOCK(base, th_base_lock);
if (base->running_loop) {
event_warnx("%s: reentrant invocation. Only one event_base_loop"
" can run on each event_base at once.", __func__);
EVBASE_RELEASE_LOCK(base, th_base_lock);
return -1;
}
base->running_loop = 1;
clear_time_cache(base);
if (base->sig.ev_signal_added && base->sig.ev_n_signals_added)
evsig_set_base(base);
done = 0;
#ifndef _EVENT_DISABLE_THREAD_SUPPORT
base->th_owner_id = EVTHREAD_GET_ID();
#endif
base->event_gotterm = base->event_break = 0;
while (!done) {
base->event_continue = 0;
/* Terminate the loop if we have been asked to */
if (base->event_gotterm) {
break;
}
if (base->event_break) {
break;
}
timeout_correct(base, &tv);
tv_p = &tv;
if (!N_ACTIVE_CALLBACKS(base) && !(flags & EVLOOP_NONBLOCK)) {
timeout_next(base, &tv_p);
} else {
/*
* if we have active events, we just poll new events
* without waiting.
*/
evutil_timerclear(&tv);
}
/* If we have no events, we just exit */
if (!event_haveevents(base) && !N_ACTIVE_CALLBACKS(base)) {
event_debug(("%s: no events registered.", __func__));
retval = 1;
goto done;
}
/* update last old time */
gettime(base, &base->event_tv);
clear_time_cache(base);
res = evsel->dispatch(base, tv_p);
if (res == -1) {
event_debug(("%s: dispatch returned unsuccessfully.",
__func__));
retval = -1;
goto done;
}
update_time_cache(base);
timeout_process(base);
if (N_ACTIVE_CALLBACKS(base)) {
int n = event_process_active(base);
if ((flags & EVLOOP_ONCE)
&& N_ACTIVE_CALLBACKS(base) == 0
&& n != 0)
done = 1;
} else if (flags & EVLOOP_NONBLOCK)
done = 1;
}
event_debug(("%s: asked to terminate loop.", __func__));
done:
clear_time_cache(base);
base->running_loop = 0;
EVBASE_RELEASE_LOCK(base, th_base_lock);
return (retval);
}