1. 惊群
设想下,某服务器程序共开启5个线程来监听客户端的连接,很可能这5个线程同时阻塞于accept(或者是epoll_wait)。当客户端请求连接时,内核会唤醒这些线程,5个线程会同时去竞争这一个用户的connect请求。但最终只有一个线程能够accept成功,4个accpet失败,返回错误(accept error, Resource temporarily unavailable)。这就是服务器多线程
惊群问题。
这对系统资源来说,是一种浪费。实际上同一时刻,只需要一个线程处于阻塞状态即可。
2. 惊群问题的解决
(1)只有一个进程/线程使用accept函数:启动一个专门的进程/线程来等待客户端的连接。当客户端有connect请求时,该进程/线程就将该connect请求放入到专门的队列中,同时去通知其他的某个进程/线程来处理该事件。
(2)多个进程/线程都使用accept函数,但对这些accept函数使用锁。保证在某个时刻,只有一个进程/线程处于accept的阻塞状态。Nginx就是使用的这种方法。
3. Nginx代码分析
Nginx多进程,共享内存,多个进程使用__sync_bool_compare_and_swap()来进行互斥,在同一时刻,保证只有一个进程在等待客户端的连接。
当有客户端connect时,服务器进程接受连接客户端连接时,应该尽可能少时间的占用锁,否则会影响新的客户端的connect请求。在Nginx中,有2个延迟处理队列(ngx_posted_accept_events与ngx_posted_events),分别延迟处理accept事件和epoll的普通读写事件。服务器进程先将accept事件放入到ngx_posted_accept_events队列中,然后释放锁,最后再处理ngx_posted_accept_events队列中的连接请求。
(1)Nginx是多进程的,所以其采用共享内存的方法。
-
// (1)互斥结构
typedef struct {
ngx_atomic_t *lock;
ngx_atomic_t *wait;
ngx_uint_t semaphore;
sem_t sem;
ngx_uint_t spin;
} ngx_shmtx_t;
ngx_shmtx_t ngx_accept_mutex; // 互斥量
-
// (2)分配共享内存,ngx_shm_alloc()函数中,
shm->addr = (u_char *) mmap(NULL, shm->size, PROT_READ|PROT_WRITE, MAP_ANON|MAP_SHARED, -1, 0);
-
// (3)ngx_event_module_init()函数中,分配共享内存
-
shm.size = size;
-
shm.name.len = sizeof("nginx_shared_zone");
-
shm.name.data = (u_char *) "nginx_shared_zone";
-
shm.log = cycle->log;
-
-
if (ngx_shm_alloc(&shm) != NGX_OK) { // 调用(2)中的函数mmap
-
return NGX_ERROR;
-
}
-
-
shared = shm.addr;
-
-
ngx_accept_mutex_ptr = (ngx_atomic_t *) shared;
-
ngx_accept_mutex.spin = (ngx_uint_t) -1;
-
-
if (ngx_shmtx_create(&ngx_accept_mutex, (ngx_shmtx_sh_t *) shared,
-
cycle->lock_file.data)
-
!= NGX_OK)
-
{
-
return NGX_ERROR;
-
}
// (4)将共享内存单元的地址,赋值给互斥量
ngx_int_t ngx_shmtx_create(ngx_shmtx_t *mtx, ngx_shmtx_sh_t *addr, u_char *name)
{
mtx->lock = &addr->lock; // 指向共享内存单元
if (mtx->spin == (ngx_uint_t) -1) {
return NGX_OK;
}
mtx->spin = 2048;
mtx->wait = &addr->wait;
if (sem_init(&mtx->sem, 1, 0) == -1) {
ngx_log_error(NGX_LOG_ALERT, ngx_cycle->log, ngx_errno, "sem_init() failed");
} else {
mtx->semaphore = 1;
}
return NGX_OK;
}
(2)试着获取锁
点击(此处)折叠或打开
-
#define ngx_atomic_cmp_set(lock, old, set) __sync_bool_compare_and_swap(lock, old, set)
-
// 若成功获取锁,就将本进程ID,写入到共享内存区
-
ngx_uint_t ngx_shmtx_trylock(ngx_shmtx_t *mtx)
-
{
-
return (*mtx->lock == 0 && ngx_atomic_cmp_set(mtx->lock, 0, ngx_pid));
-
}
// 尝试着获取锁
ngx_int_t ngx_trylock_accept_mutex(ngx_cycle_t *cycle)
{
//成功获得锁,将enable accept事件
if (ngx_shmtx_trylock(&ngx_accept_mutex)) {
ngx_log_debug0(NGX_LOG_DEBUG_EVENT, cycle->log, 0, "accept mutex locked");
if (ngx_accept_mutex_held
&& ngx_accept_events == 0
&& !(ngx_event_flags & NGX_USE_RTSIG_EVENT))
{
return NGX_OK;
}
// enable accept, 将accept的句柄(read事件),加入到epoll
if (ngx_enable_accept_events(cycle) == NGX_ERROR) {
ngx_shmtx_unlock(&ngx_accept_mutex);
return NGX_ERROR;
}
ngx_accept_events = 0;
ngx_accept_mutex_held = 1; // 获得锁,标志置1
return NGX_OK;
}
ngx_log_debug1(NGX_LOG_DEBUG_EVENT, cycle->log, 0,
"accept mutex lock failed: %ui", ngx_accept_mutex_held);
// 没有获得锁,disable accept事件
if (ngx_accept_mutex_held) {
if (ngx_disable_accept_events(cycle) == NGX_ERROR) {
return NGX_ERROR;
}
ngx_accept_mutex_held = 0; // 没有获得锁,标志置0
}
return NGX_OK;
}
(3)Nginx处理定时器&事件的函数
点击(此处)折叠或打开
-
// 处理事件 & 定时
-
void ngx_process_events_and_timers(ngx_cycle_t *cycle)
-
{
-
ngx_uint_t flags;
-
ngx_msec_t timer, delta;
-
-
if (ngx_timer_resolution) {
-
timer = NGX_TIMER_INFINITE; // 超时 时间
-
flags = 0;
-
-
} else {
-
timer = ngx_event_find_timer();
-
flags = NGX_UPDATE_TIME;
-
}
-
-
if (ngx_use_accept_mutex) {//使用accept_mutex来解决惊群问题
-
if (ngx_accept_disabled > 0) { //大于本进程最大链接数的7/8,放弃获取锁(本进程的连接数过多)
-
ngx_accept_disabled--;
-
-
} else {
-
if (ngx_trylock_accept_mutex(cycle) == NGX_ERROR) { //试着获取锁
-
return;
-
}
-
-
if (ngx_accept_mutex_held) {//拿到锁
-
flags |= NGX_POST_EVENTS;// event事件延迟处理,先处理 accept事件
-
} else {
-
if (timer == NGX_TIMER_INFINITE // 不超时
-
|| timer > ngx_accept_mutex_delay)
-
{
-
timer = ngx_accept_mutex_delay; //要等待 ngx_accept_mutex_delay 后再去抢锁
-
}
-
}
-
}
-
}
-
-
delta = ngx_current_msec;
-
-
(void) ngx_process_events(cycle, timer, flags); // ngx_epoll_process_events
-
-
delta = ngx_current_msec - delta;
-
-
ngx_log_debug1(NGX_LOG_DEBUG_EVENT, cycle->log, 0, "timer delta: %M", delta);
-
-
if (ngx_posted_accept_events) {//延时处理accept事件
-
ngx_event_process_posted(cycle, &ngx_posted_accept_events);
-
}
-
-
if (ngx_accept_mutex_held) {// 拥有锁,赶紧释放掉
-
ngx_shmtx_unlock(&ngx_accept_mutex);
-
}
-
// 看是否有 定时器信息
-
if (delta) {
-
ngx_event_expire_timers();
-
}
-
-
ngx_log_debug1(NGX_LOG_DEBUG_EVENT, cycle->log, 0, "posted events %p", ngx_posted_events);
-
-
if (ngx_posted_events) { // 延时处理事件
-
if (ngx_threaded) {
-
ngx_wakeup_worker_thread(cycle);
-
} else {
-
ngx_event_process_posted(cycle, &ngx_posted_events);
-
}
-
}
-
}
(4)epoll处理事件
点击(此处)折叠或打开
-
// epoll 处理事件的函数
-
static ngx_int_t ngx_epoll_process_events(ngx_cycle_t *cycle, ngx_msec_t timer, ngx_uint_t flags)
-
{
-
int events;
-
uint32_t revents;
-
ngx_int_t instance, i;
-
ngx_uint_t level;
-
ngx_err_t err;
-
ngx_event_t *rev, *wev, **queue;
-
ngx_connection_t *c;
-
-
/* NGX_TIMER_INFINITE == INFTIM */
-
-
ngx_log_debug1(NGX_LOG_DEBUG_EVENT, cycle->log, 0, "epoll timer: %M", timer);
-
-
events = epoll_wait(ep, event_list, (int) nevents, timer);
-
-
err = (events == -1) ? ngx_errno : 0; // 出错了
-
-
if (flags & NGX_UPDATE_TIME || ngx_event_timer_alarm) {
-
ngx_time_update(); //更新时间
-
}
-
-
if (err) {
-
if (err == NGX_EINTR) {
-
-
if (ngx_event_timer_alarm) {
-
ngx_event_timer_alarm = 0;
-
return NGX_OK;
-
}
-
-
level = NGX_LOG_INFO;
-
-
} else {
-
level = NGX_LOG_ALERT;
-
}
-
-
ngx_log_error(level, cycle->log, err, "epoll_wait() failed");
-
return NGX_ERROR;
-
}
-
-
if (events == 0) {
-
if (timer != NGX_TIMER_INFINITE) {
-
return NGX_OK;
-
}
-
-
ngx_log_error(NGX_LOG_ALERT, cycle->log, 0,
-
"epoll_wait() returned no events without timeout");
-
return NGX_ERROR;
-
}
-
-
ngx_mutex_lock(ngx_posted_events_mutex);
-
-
for (i = 0; i < events; i++) {
-
c = event_list[i].data.ptr;
-
-
instance = (uintptr_t) c & 1;
-
c = (ngx_connection_t *) ((uintptr_t) c & (uintptr_t) ~1);
-
-
rev = c->read;
-
-
if (c->fd == -1 || rev->instance != instance) {
-
-
/*
-
* the stale event from a file descriptor
-
* that was just closed in this iteration
-
*/
-
-
ngx_log_debug1(NGX_LOG_DEBUG_EVENT, cycle->log, 0, "epoll: stale event %p", c);
-
continue;
-
}
-
-
revents = event_list[i].events; // EPOLLIN, EPOLLOUT,EPOLLHUP,EPOLLERR
-
-
ngx_log_debug3(NGX_LOG_DEBUG_EVENT, cycle->log, 0, "epoll: fd:%d ev:%04XD d:%p", c->fd, revents, event_list[i].data.ptr);
-
-
if (revents & (EPOLLERR|EPOLLHUP)) {
-
ngx_log_debug2(NGX_LOG_DEBUG_EVENT, cycle->log, 0, "epoll_wait() error on fd:%d ev:%04XD", c->fd, revents);
-
}
-
-
if ((revents & (EPOLLERR|EPOLLHUP))
-
&& (revents & (EPOLLIN|EPOLLOUT)) == 0)
-
{
-
/*
-
* if the error events were returned without EPOLLIN or EPOLLOUT,
-
* then add these flags to handle the events at least in one
-
* active handler
-
*/
-
-
revents |= EPOLLIN|EPOLLOUT;
-
}
-
-
if ((revents & EPOLLIN) && rev->active) {//接收状态为active
-
// 事件延迟处理 & 不是accept事件
-
if ((flags & NGX_POST_THREAD_EVENTS) && !rev->accept) {
-
rev->posted_ready = 1;
-
-
} else {
-
rev->ready = 1;//立即处理
-
}
-
-
if (flags & NGX_POST_EVENTS) {//延迟处理事件
-
queue = (ngx_event_t **) (rev->accept ? // 选择队列
-
&ngx_posted_accept_events : &ngx_posted_events);
-
-
ngx_locked_post_event(rev, queue);//若延迟处理事件,只是将事件放入队列
-
-
} else { // 立即处理事件
-
rev->handler(rev); //处理事件的回调函数
-
}
-
}
-
-
wev = c->write;
-
-
if ((revents & EPOLLOUT) && wev->active) {
-
-
if (c->fd == -1 || wev->instance != instance) {
-
-
/*
-
* the stale event from a file descriptor
-
* that was just closed in this iteration
-
*/
-
-
ngx_log_debug1(NGX_LOG_DEBUG_EVENT, cycle->log, 0, "epoll: stale event %p", c);
-
continue;
-
}
-
-
if (flags & NGX_POST_THREAD_EVENTS) {
-
wev->posted_ready = 1;
-
-
} else {
-
wev->ready = 1;
-
}
-
-
if (flags & NGX_POST_EVENTS) {
-
ngx_locked_post_event(wev, &ngx_posted_events);
-
-
} else {
-
wev->handler(wev); // 处理写事件
-
}
-
}
-
}
-
-
ngx_mutex_unlock(ngx_posted_events_mutex);
-
-
return NGX_OK;
-
}
(5)event被延迟处理
点击(此处)折叠或打开
-
void ngx_event_process_posted(ngx_cycle_t *cycle, ngx_thread_volatile ngx_event_t **posted)
-
{
-
ngx_event_t *ev;
-
-
for ( ;; ) {
-
ev = (ngx_event_t *) *posted;
-
ngx_log_debug1(NGX_LOG_DEBUG_EVENT, cycle->log, 0, "posted event %p", ev);
-
-
if (ev == NULL) {
-
return;
-
}
-
-
ngx_delete_posted_event(ev); //从队列中删掉
-
-
ev->handler(ev); //处理
-
}
-
}
阅读(3283) | 评论(0) | 转发(0) |