分类: C/C++
2012-06-13 14:17:24
对一个高性能服务器来说,处理速度快和资源占用小是典型特性,尤其是当服务器遇到C10K问题的时候(网络服务器在处理数以万计的客户端连接时,往往出现效率低下甚至完全瘫痪,这被称为C10K问题)。要做到处理速度足够快,其并发模型的设计相当关键,而要做到资源尤其是内存资源的占用少,就要依赖于其资源分配和资源管理的方案设计。
服务器的并发模型设计是网络编程中很关键的一个部分,服务器的并发量取决于两个因素,一个是提供服务的进程数量,另外一个是每个进程可同时处理的并发连接数量。相应的,服务器的并发模型也由两个部分构成:进程模型和连接处理机制。进程模型主要有以下3种模型:
(1)单进程模式:这种模式的服务器称为迭代服务器,实现最简单,也没有进程控制的开销,cpu利用率最高,但是所有的客户连接请求排队等待处理,如果有一条连接时长过大,则其他请求就会被阻塞甚至被丢弃,这种模型也很容易被攻击,一般很少使用这种模型;
(2)多进程并发模式:这种模式由master进程启动监听并接受连接,然后为每个客户连接请求fork一个worker子进程处理客户请求,这种模式也比较简单,但是为每个客户连接请求fork一个子进程比较耗费cpu时间,而且子进程过多的情况下可能会用尽内存,导致开始对换,整体性能急降,这种模型在小并发的情况下比较常用,比如每天处理几千个客户请求的情况;
(3)prefork模式:master进程监听客户端连接请求并持续监视可用子进程数量,低于阀值则fork额外的子进程,高于阀值则kill掉一些过剩的子进程。这种模式根据accept的具体情形又可以分为三种变体:
master 负责listen,每个worker子进程独自accept,accept无上锁。所有worker阻塞于同一个监听套接字上睡眠,当有新的客户连接请求时,内核会唤醒所有等待该事件的睡眠worker子进程,最先执行的worker将获得连接套接字并处理请求,这种模型会导致惊群问题,尽管只有一个子进程将获得连接,但是所有子进程都会被唤醒,子进程越多,惊群问题对于性能的影响就越大。另一方面,如果每个worker不是阻塞于accept而是阻塞于select,则很容易造成select冲突问题,这种情况的性能损耗更大,所以这种模型一般都是直接阻塞于accept,不阻塞于select;
master 负责listen,每个worker子进程独自accpet,accept有上锁。这种模型解决了惊群问题,只有一个worker阻塞于accpet,其余worker都阻塞于获取锁资源,上锁可以使用文件上锁或者使用共享内存的互斥锁,这种模型的cpu耗时略高于第一种模型。这两种模型都是由内核负责把客户端连接请求交由某个worker,客户连接请求的处理比较均匀,因为内核使用了公平的进程切换方式;
master负责listen 和accpet,通过某种方式把获得的连接套接字交给一个空闲worker。这种模型下的master必须管理所有worker子进程的状态,并且要使用某种方式的进程间通信方式传递套接字给子进程,比如采用socketpair创建字节流管道用于传递。相对于上面两种模型而言,这种模型复杂度高一些,cpu耗时也更高,并且子进程的分配也由master负责,是否均匀取决于master。
以上的进程模型都假定了两个条件,即套接字是阻塞的,并且每个客户连接请求对应一个子进程。这也就意味着如果同时并发量很高的时候,比如超过1万的并发量,就要有1万个worker子进程同时服务,内存耗光后,服务器性能急剧下降。这些模型基本上只能服务于并发量很低的情况,一般在1千以内勉强过得去(还依赖于每个处理的消耗)。
一个自然的解决办法就是把进程与连接的比例从1:1变成m:n。m=1、n>1的情况下,一个worker进 程可以处理多个连接请求,这样对于每个客户端连接的处理就不能是全程阻塞的了。可以把每个客户端连接的处理分为若干过程,每个过程都是一个状态,这样就可 以把对一个客户的连接请求处理分解成若干步骤。如果把每个客户请求的处理分开为不同的阶段,就可以在一个子进程内或者一批子进程间并发的处理更多的连接请 求了,并且可以更好的控制资源的分配和管理,将资源的消耗降到一定的低水平,这样也就等于提高了服务器的整体并发能力。下面介绍一下并发模型的连接处理机 制,这个机制的关键是IO模型。一般有五种典型的IO模型:
(1)阻塞IO模型:当套接口是阻塞的,所有的输入操作(调用connect, accept, read, recvfrom, recv, recvmsg等输入函数)发起后会阻塞到两个步骤完成才会返回;
(2)非阻塞IO模型:当套接口是非阻塞的,所有的输入操作在第一个步骤立即返回,这个时候一般需要轮询检查(循环调用输入函数),当数据准备好或者连接已经建立进入第二步的情况下,调用的输入函数将阻塞到第二步完成为止;
(3)IO复用模型:当在等待多个套接口的输入时,可以调用select、poll等IO复用函数监听这些套接口的输入事件,进程会阻塞在这些调用上,直到有一个或者多个套接口的输入事件发生,也即完成了第一步,IO复用函数会返回这些套接口,接着进程可以调用输入函数完成这些套接口的第二步;
(4)信号驱动IO模型:创建套接口的时候,开启套接口的信号驱动IO功能,并安装一个信号处理函数,处理SIGIO信号。当套接口完成了第一步时,会发送SIGIO信号通知进程处理,进程在信号处理函数中完成第二步;
(5)异步IO模型:告诉内核启动某个输入操作,并让内核在完成了输入操作之后(两个步骤都完成)通知进程。
前3种模型在所有的操作系统都支持,而后两种模型很少操作系统实现,前4种IO模型都会导致进程阻塞,直到IO操作完成,属于同步IO操作,只有异步IO模型不导致进程阻塞,是异步IO操作。
IO 复用模型中,select和poll一般所有的操作系统都会支持,但是每次等待都要设置需要等待的套接口,并且内部的实现不够高效,很难支持监听高并发量的套接口集。不同的操作系统使用了不同的高级轮询技术来支持高性能的监听,一般这些方式都不是可移植的,比如freebsd上实现了 kqueue,solaris实现了/dev/poll,linux实现了epoll等等。nginx针对不同的操作系统,定制了不同的IO处理机制,一般都会采用操作系统的高性能接口。
2、基本思想为了追求高并发和快速响应,并发连接是任何服务端程序都逃不掉的重要性能指标,如何处理大量并发连接无疑是服务器端程序设计时所要考虑的第一问题。nginx采用的是大部分HTTP服务器的做法,即master-worker模型,一个master进程管理一个或者多个worker进程,基本的事件处理都是放在worker进程,master负责一些全局初始化,以及对worker进程的管理。
在nginx中,master进程和worker进程的通信主要是通过socketpair来实现的,每当fork完一个子进程之后,就将这个子进程的socketpair句柄传递给前面已经存在的子进程,这样子进程之间也就可以通信了。Nginx中fork子进程的函数是ngx_spawn_process()。主要实现代码在ngx_process.h和ngx_process.c文件中。
3、数据结构 ngx_process_t
typedef struct {
ngx_pid_t pid; //进程id
int status; //进程的退出状态(主要在waitpid中进行处理)
ngx_socket_t channel[2]; //socketpair创建的一对socket句柄
ngx_spawn_proc_pt proc; //进程的执行函数
void *data; //proc的参数
char *name;
unsigned respawn:1; //进程的状态,重新创建
unsigned just_respawn:1; //进程的状态,第一次创建
unsigned detached:1; //进程的状态,分离
unsigned exiting:1; //进程的状态,正在退出
unsigned exited:1; //进程的状态,已经退出
} ngx_process_t;
ngx_channel_ttypedef struct {
ngx_uint_t command; //要发送的命令
ngx_pid_t pid; //发送方的进程id
ngx_int_t slot; //发送方进程在进程表中的偏移位置
ngx_fd_t fd; //发送给对方的句柄
} ngx_channel_t;
ngx_cycle_sstruct ngx_cycle_s {
void ****conf_ctx;
ngx_pool_t *pool;
ngx_log_t *log;
ngx_log_t new_log;
ngx_connection_t **files;
ngx_connection_t *free_connections;
ngx_uint_t free_connection_n;
ngx_queue_t reusable_connections_queue;
ngx_array_t listening;
ngx_array_t pathes;
ngx_list_t open_files;
ngx_list_t shared_memory;
ngx_uint_t connection_n;
ngx_uint_t files_n;
ngx_connection_t *connections;
ngx_event_t *read_events;
ngx_event_t *write_events;
ngx_cycle_t *old_cycle;
ngx_str_t conf_file;
ngx_str_t conf_param;
ngx_str_t conf_prefix;
ngx_str_t prefix;
ngx_str_t lock_file;
ngx_str_t hostname;
};
备注:在nginx中,一个cycle代表一个进程,所有进程相关变量(包括连接)都在这个结构体中。
ngx_listening_sstruct ngx_listening_s {
ngx_socket_t fd; //监听套接字的套接字描述符
struct sockaddr *sockaddr; //监听套接口地址结构
socklen_t socklen;
size_t addr_text_max_len;
ngx_str_t addr_text;
int type; //SOCK_STREAM
int backlog;
int rcvbuf; // 监听套接口的接收缓冲区长度
int sndbuf; // 监听套接口的发送缓冲区长度
ngx_connection_handler_pt handler;
void *servers;
ngx_log_t log;
ngx_log_t *logp;
size_t pool_size;
size_t post_accept_buffer_size;
ngx_msec_t post_accept_timeout;
ngx_listening_t *previous;
ngx_connection_t *connection; // 监听也是一个连接,要分配给监听一个连接资源
unsigned open:1;
unsigned remain:1;
unsigned ignore:1;
unsigned bound:1;
unsigned inherited:1;
unsigned nonblocking_accept:1;
unsigned listen:1;
unsigned nonblocking:1;
unsigned shared:1;
unsigned addr_ntop:1;
#if (NGX_HAVE_INET6 && defined IPV6_V6ONLY)
unsigned ipv6only:2;
#endif
#if (NGX_HAVE_DEFERRED_ACCEPT)
unsigned deferred_accept:1;
unsigned delete_deferred:1;
unsigned add_deferred:1;
#ifdef SO_ACCEPTFILTER
char *accept_filter;
#endif
#endif
#if (NGX_HAVE_SETFIB)
int setfib;
#endif
};
ngx_connection_sstruct ngx_connection_s {
void *data;
ngx_event_t *read; //读事件
ngx_event_t *write; //写事件
ngx_socket_t fd; //连接套接口的套接口描述字
ngx_recv_pt recv;
ngx_send_pt send;
ngx_recv_chain_pt recv_chain;
ngx_send_chain_pt send_chain;
ngx_listening_t *listening; //该连接对应的监听
off_t sent;
ngx_log_t *log;
ngx_pool_t *pool;
struct sockaddr *sockaddr;
socklen_t socklen;
ngx_str_t addr_text;
#if (NGX_SSL)
ngx_ssl_connection_t *ssl;
#endif
struct sockaddr *local_sockaddr;
ngx_buf_t *buffer;
ngx_queue_t queue;
ngx_atomic_uint_t number;
ngx_uint_t requests;
unsigned buffered:8;
unsigned log_error:3; /* ngx_connection_log_error_e */
unsigned single_connection:1;
unsigned unexpected_eof:1;
unsigned timedout:1;
unsigned error:1;
unsigned destroyed:1;
unsigned idle:1;
unsigned reusable:1;
unsigned close:1;
unsigned sendfile:1;
unsigned sndlowat:1;
unsigned tcp_nodelay:2; /* ngx_connection_tcp_nodelay_e */
unsigned tcp_nopush:2; /* ngx_connection_tcp_nopush_e */
#if (NGX_HAVE_IOCP)
unsigned accept_context_updated:1;
#endif
#if (NGX_HAVE_AIO_SENDFILE)
unsigned aio_sendfile:1;
ngx_buf_t *busy_sendfile;
#endif
#if (NGX_THREADS)
ngx_atomic_t lock;
#endif
};
4、具体实现nginx的进程启动过程是在ngx_master_process_cycle()函数中实现的,单进程是通过ngx_single_process_cycle()函数中完成,在多进程模型中,会根据配置文件的worker_processes值创建多个子进程,即一个master和多个worker子进程。进程之间、进程与外部之间保持通信,进程之间是通过socketpair进行通信的,进程与外部之间是通过信号通信的。
master进程主要进行一些全局性的初始化工作和管理worker子进程的工作,事件处理是在worker子进程中进行的。进程启动过程中,有些全局数据会被设置,最重要的是进程表ngx_processes,master进程没创建一个worker子进程,都会把一个设置好的ngx_process_t结构变量放入ngx_processes中,进程表长度是1024。
ngx_open_listening_sockets(cycle)主要功能:读取配置文件,绑定、监听服务端口。
ngx_int_t
ngx_open_listening_sockets(ngx_cycle_t *cycle)
{
。。。
for (tries = 5; tries; tries--) {
failed = 0;
/* for each listening socket */
ls = cycle->listening.elts;
for (i = 0; i < cycle->listening.nelts; i++) {
s = ngx_socket(ls[i].sockaddr->sa_family, ls[i].type, 0); //socket
if (setsockopt(s, SOL_SOCKET, SO_REUSEADDR,
(const void *) &reuseaddr, sizeof(int)) //setsockopt
== -1)
{
}
#if (NGX_HAVE_INET6 && defined IPV6_V6ONLY)
if (ls[i].sockaddr->sa_family == AF_INET6 && ls[i].ipv6only) {
}
#endif
if (bind(s, ls[i].sockaddr, ls[i].socklen) == -1) { //bind
}
#if (NGX_HAVE_UNIX_DOMAIN)
if (ls[i].sockaddr->sa_family == AF_UNIX) {
}
#endif
if (listen(s, ls[i].backlog) == -1) { //listen
}
ls[i].listen = 1;
ls[i].fd = s;
}//for cycle
if (!failed) {
break;
}
ngx_msleep(500);
} //for tries=5
if (failed) {
ngx_log_error(NGX_LOG_EMERG, log, 0, "still could not bind()");
return NGX_ERROR;
}
return NGX_OK;
}
备注:可以看到ngx_init_cycle()里的ngx_open_listening_sockets()主要功能是socket、bind和listen函数的调用,最终创建完的监听套接字就在cycle结构体的listening域里。
ngx_master_process_cycle(ngx_cycle_t *cycle)void
ngx_master_process_cycle(ngx_cycle_t *cycle)
{
//添加信号集
sigemptyset(&set);
sigaddset(&set, SIGCHLD);
sigaddset(&set, SIGALRM);
sigaddset(&set, SIGIO);
sigaddset(&set, SIGINT);
sigaddset(&set, ngx_signal_value(NGX_RECONFIGURE_SIGNAL));
sigaddset(&set, ngx_signal_value(NGX_REOPEN_SIGNAL));
sigaddset(&set, ngx_signal_value(NGX_NOACCEPT_SIGNAL));
sigaddset(&set, ngx_signal_value(NGX_TERMINATE_SIGNAL));
sigaddset(&set, ngx_signal_value(NGX_SHUTDOWN_SIGNAL));
sigaddset(&set, ngx_signal_value(NGX_CHANGEBIN_SIGNAL));
if (sigprocmask(SIG_BLOCK, &set, NULL) == -1) {
ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno,
"sigprocmask() failed");
}
sigemptyset(&set);
。。。
//根据配置文件,启动子进程,子进程进入自己的事件循环
ngx_start_worker_processes(cycle, ccf->worker_processes,
NGX_PROCESS_RESPAWN);
//master进程进入自己的事件循环,即接收信号、管理worker进程
for ( ;; ) {
。。。
}
}
ngx_start_worker_processes(ngx_cycle_t *cycle, ngx_int_t n, ngx_int_t type)主要功能:创建worker子进程。
static void
ngx_start_worker_processes(ngx_cycle_t *cycle, ngx_int_t n, ngx_int_t type)
{
for (i = 0; i < n; i++) {
cpu_affinity = ngx_get_cpu_affinity(i);
ngx_spawn_process(cycle, ngx_worker_process_cycle, NULL,
"worker process", type);
ch.pid = ngx_processes[ngx_process_slot].pid;
ch.slot = ngx_process_slot;
ch.fd = ngx_processes[ngx_process_slot].channel[0];
ngx_pass_open_channel(cycle, &ch);
}
}
ngx_spawn_process(ngx_cycle_t *cycle, ngx_spawn_proc_pt proc, void *data,char *name, ngx_int_t respawn)ngx_pid_t
ngx_spawn_process(ngx_cycle_t *cycle, ngx_spawn_proc_pt proc, void *data,
char *name, ngx_int_t respawn)
{
。。。
pid = fork();
switch (pid) {
case -1:
ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno,
"fork() failed while spawning \"%s\"", name);
ngx_close_channel(ngx_processes[s].channel, cycle->log);
return NGX_INVALID_PID;
case 0: //子进程
ngx_pid = ngx_getpid();
proc(cycle, data); //子进程进入自己的事件循环
break;
default: //父进程
break;
}
。。。
}
ngx_worker_process_cycle(ngx_cycle_t *cycle, void *data)
static void
ngx_worker_process_cycle(ngx_cycle_t *cycle, void *data)
{
ngx_uint_t i;
ngx_connection_t *c;
ngx_process = NGX_PROCESS_WORKER;
//初始化,并设置子进程title
ngx_worker_process_init(cycle, 1);
ngx_setproctitle("worker process");
。。。
//子进程自己的事件循环
for ( ;; ) {
//退出状态已设置,关闭所有连接
if (ngx_exiting) {
c = cycle->connections;
for (i = 0; i < cycle->connection_n; i++) {
if (c[i].fd != -1 && c[i].idle) {
c[i].close = 1;
c[i].read->handler(c[i].read);
}
}
if (ngx_event_timer_rbtree.root == ngx_event_timer_rbtree.sentinel)
{
ngx_log_error(NGX_LOG_NOTICE, cycle->log, 0, "exiting");
ngx_worker_process_exit(cycle);
}
}
ngx_log_debug0(NGX_LOG_DEBUG_EVENT, cycle->log, 0, "worker cycle");
//处理事件和计时
ngx_process_events_and_timers(cycle);
。。。
}
}
备注:worker进程的事件循环就是监听网络事件并处理(如新建连接、断开连接、处理请求、发送响应等),所以真正的连接最终是连到了worker进程,但是worker进程之间是怎么调用accept()函数呢?
所有的worker进程都有监听套接字,都能够accept一个连接,但是nginx准备了一个accept锁,因此所有的子进程在走到处理新连接这一步的时候都要争下accept锁,争到锁的worker进程可以调用accept()并接受新连接。
这样做的目的就是为了防止多个进程同时accept,当一个连接来的时候多个进程同时被唤起,即惊群。
ngx_process_events_and_timers(ngx_cycle_t *cycle)函数功能:事件循环的核心。
void ngx_process_events_and_timers(ngx_cycle_t *cycle)
{
ngx_uint_t flags;
ngx_msec_t timer, delta;
// 如果配置文件中设置了时间精度
if (ngx_timer_resolution) {
timer = NGX_TIMER_INFINITE;
flags = 0;
} else {
timer = ngx_event_find_timer();
flags = NGX_UPDATE_TIME;
...
}
// ngx_use_accept_mutex变量代表是否使用accept互斥体,默认使用,accept_mutex off,指令关闭。accept mutex的作用就是避免惊群,同时实现负载均衡。
if (ngx_use_accept_mutex) {
// ngx_accept_disabled变量在ngx_event_accept函数中计算。如果ngx_accept_disabled大于0,就表示该进程接受的连接过多,因此就放弃一次争抢accept mutex的机会,同时将
自己减1。然后,继续处理已有连接上的事件。nginx就借用此变量实现了进程关于连接的基本负载均衡。
if (ngx_accept_disabled > 0) {
ngx_accept_disabled--;
} else {
// 尝试加锁accept mutex,只有成功获取锁的进程,才会将listen套接字放入epool中,因此保证了只有一个进程拥有监听套接口,故所有进程阻塞在epool_wait时,不会出现惊群现象。
if (ngx_trylock_accept_mutex(cycle) == NGX_ERROR) {
return;
}
if (ngx_accept_mutex_held) {
//获取锁的进程,将添加一个NGX_POST_EVENTS标志,此标志的作用是将所有产生的事件放入一个队列中,等释放锁后,再慢慢来处理事件。因为,处理事件可能会很耗时,如果不先释放锁再处理的话,该进程就长时间霸占了锁,导致其他进程无法获取锁,这样accept
的效率就低了。
flags |= NGX_POST_EVENTS;
} else {
if (timer == NGX_TIMER_INFINITE
|| timer > ngx_accept_mutex_delay)
{
//设置最长延迟多久,再去争抢锁
timer = ngx_accept_mutex_delay;
}
}
}
}
delta = ngx_current_msec;
// 调用process_events钩子轮询事件,有些事件即时调用事件处理函数处理,有些事件放入延迟队列等待后面处理,ngx_process_events的具体实现是对应到epoll模块中的ngx_epoll_process_events函数
(void) ngx_process_events(cycle, timer, flags);
delta = ngx_current_msec - delta;
ngx_log_debug1(NGX_LOG_DEBUG_EVENT, cycle->log, 0,
"timer delta: %M", delta);
// 有需要延迟处理的监听套接口事件
if (ngx_posted_accept_events) {
ngx_event_process_posted(cycle, &ngx_posted_accept_events);
}
// 释放锁
if (ngx_accept_mutex_held) {
ngx_shmtx_unlock(&ngx_accept_mutex);
}
//delta是上文对epool wait事件的耗时统计,存在毫秒级的耗时就对所有事件的timer进行检查,如果time out就从timer rbtree中删除到期的timer,同时调用相应事件的handler函数完成处理
if (delta) {
ngx_event_expire_timers();
}
ngx_log_debug1(NGX_LOG_DEBUG_EVENT, cycle->log, 0,
"posted events %p", ngx_posted_events);
// 有需要延迟处理的数据套接口事件
if (ngx_posted_events) {
// 处理
if (ngx_threaded) {
ngx_wakeup_worker_thread(cycle);
} else {
ngx_event_process_posted(cycle, &ngx_posted_events);
}
}
}