Chinaunix首页 | 论坛 | 博客
  • 博客访问: 157849
  • 博文数量: 22
  • 博客积分: 425
  • 博客等级: 下士
  • 技术积分: 350
  • 用 户 组: 普通用户
  • 注册时间: 2012-05-15 09:43
个人简介

专注服务器设计、开发; https://github.com/justscu;

文章分类

全部博文(22)

文章存档

2014年(10)

2013年(2)

2012年(10)

分类: 服务器与存储

2014-04-21 20:32:56

1. 惊群

    设想下,某服务器程序共开启5个线程来监听客户端的连接,很可能这5个线程同时阻塞于accept(或者是epoll_wait)。当客户端请求连接时,内核会唤醒这些线程,5个线程会同时去竞争这一个用户的connect请求。但最终只有一个线程能够accept成功,4个accpet失败,返回错误(accept error, Resource temporarily unavailable)。这就是服务器多线程惊群问题。
    这对系统资源来说,是一种浪费。实际上同一时刻,只需要一个线程处于阻塞状态即可。

2. 惊群问题的解决

(1)只有一个进程/线程使用accept函数:启动一个专门的进程/线程来等待客户端的连接。当客户端有connect请求时,该进程/线程就将该connect请求放入到专门的队列中,同时去通知其他的某个进程/线程来处理该事件。
(2)多个进程/线程都使用accept函数,但对这些accept函数使用锁。保证在某个时刻,只有一个进程/线程处于accept的阻塞状态。Nginx就是使用的这种方法。

3. Nginx代码分析

    Nginx多进程,共享内存,多个进程使用__sync_bool_compare_and_swap()来进行互斥,在同一时刻,保证只有一个进程在等待客户端的连接。
    当有客户端connect时,服务器进程接受连接客户端连接时,应该尽可能少时间的占用锁,否则会影响新的客户端的connect请求。在Nginx中,有2个延迟处理队列(ngx_posted_accept_events与ngx_posted_events),分别延迟处理accept事件和epoll的普通读写事件。服务器进程先将accept事件放入到ngx_posted_accept_events队列中,然后释放锁,最后再处理ngx_posted_accept_events队列中的连接请求。

(1)Nginx是多进程的,所以其采用共享内存的方法。

点击(此处)折叠或打开

  1. // (1)互斥结构
    typedef struct {
        ngx_atomic_t  *lock;
        ngx_atomic_t  *wait;
        ngx_uint_t     semaphore;
        sem_t          sem;
        ngx_uint_t     spin;
    } ngx_shmtx_t;

    ngx_shmtx_t           ngx_accept_mutex; // 互斥量

 // (2)分配共享内存,ngx_shm_alloc()函数中,
    shm->addr = (u_char *) mmap(NULL, shm->size, PROT_READ|PROT_WRITE, MAP_ANON|MAP_SHARED, -1, 0);

  1. // (3)ngx_event_module_init()函数中,分配共享内存
  2.     shm.size = size;
  3.     shm.name.len = sizeof("nginx_shared_zone");
  4.     shm.name.data = (u_char *) "nginx_shared_zone";
  5.     shm.log = cycle->log;

  6.     if (ngx_shm_alloc(&shm) != NGX_OK) { // 调用(2)中的函数mmap
  7.         return NGX_ERROR;
  8.     }

  9.     shared = shm.addr;

  10.     ngx_accept_mutex_ptr = (ngx_atomic_t *) shared;
  11.     ngx_accept_mutex.spin = (ngx_uint_t) -1;

  12.     if (ngx_shmtx_create(&ngx_accept_mutex, (ngx_shmtx_sh_t *) shared,
  13.                          cycle->lock_file.data)
  14.         != NGX_OK)
  15.     {
  16.         return NGX_ERROR;
  17.     }

// (4)将共享内存单元的地址,赋值给互斥量
ngx_int_t  ngx_shmtx_create(ngx_shmtx_t *mtx, ngx_shmtx_sh_t *addr, u_char *name)
{
    mtx->lock = &addr->lock; // 指向共享内存单元

    if (mtx->spin == (ngx_uint_t) -1) {
        return NGX_OK;
    }

    mtx->spin = 2048;
    mtx->wait = &addr->wait;
    if (sem_init(&mtx->sem, 1, 0) == -1) {
        ngx_log_error(NGX_LOG_ALERT, ngx_cycle->log, ngx_errno, "sem_init() failed");
    } else {
        mtx->semaphore = 1;
    }

    return NGX_OK;
}

(2)试着获取锁
点击(此处)折叠或打开
  1. #define ngx_atomic_cmp_set(lock, old, set)   __sync_bool_compare_and_swap(lock, old, set)

  2. // 若成功获取锁,就将本进程ID,写入到共享内存区
  3. ngx_uint_t ngx_shmtx_trylock(ngx_shmtx_t *mtx)
  4. {
  5.     return (*mtx->lock == 0 && ngx_atomic_cmp_set(mtx->lock, 0, ngx_pid));
  6. }
// 尝试着获取锁
ngx_int_t ngx_trylock_accept_mutex(ngx_cycle_t *cycle)
{
    //成功获得锁,将enable accept事件
    if (ngx_shmtx_trylock(&ngx_accept_mutex)) {

        ngx_log_debug0(NGX_LOG_DEBUG_EVENT, cycle->log, 0, "accept mutex locked");

        if (ngx_accept_mutex_held
            && ngx_accept_events == 0
            && !(ngx_event_flags & NGX_USE_RTSIG_EVENT))
        {
            return NGX_OK;
        }
    // enable accept, 将accept的句柄(read事件),加入到epoll
        if (ngx_enable_accept_events(cycle) == NGX_ERROR) {
            ngx_shmtx_unlock(&ngx_accept_mutex);
            return NGX_ERROR;
        }

        ngx_accept_events = 0;
        ngx_accept_mutex_held = 1; // 获得锁,标志置1

        return NGX_OK;
    }

    ngx_log_debug1(NGX_LOG_DEBUG_EVENT, cycle->log, 0,
                   "accept mutex lock failed: %ui", ngx_accept_mutex_held);

    // 没有获得锁,disable accept事件
    if (ngx_accept_mutex_held) { 
        if (ngx_disable_accept_events(cycle) == NGX_ERROR) {
            return NGX_ERROR;
        }

        ngx_accept_mutex_held = 0; // 没有获得锁,标志置0
    }


    return NGX_OK;
}
(3)Nginx处理定时器&事件的函数
点击(此处)折叠或打开
  1. // 处理事件 & 定时
  2. void ngx_process_events_and_timers(ngx_cycle_t *cycle)
  3. {
  4.     ngx_uint_t flags;
  5.     ngx_msec_t timer, delta;

  6.     if (ngx_timer_resolution) {
  7.         timer = NGX_TIMER_INFINITE; // 超时 时间
  8.         flags = 0;

  9.     } else {
  10.         timer = ngx_event_find_timer();
  11.         flags = NGX_UPDATE_TIME;
  12.     }

  13.     if (ngx_use_accept_mutex) {//使用accept_mutex来解决惊群问题
  14.         if (ngx_accept_disabled > 0) { //大于本进程最大链接数的7/8,放弃获取锁(本进程的连接数过多)
  15.             ngx_accept_disabled--;

  16.         } else {
  17.             if (ngx_trylock_accept_mutex(cycle) == NGX_ERROR) { //试着获取锁
  18.                 return;
  19.             }

  20.             if (ngx_accept_mutex_held) {//拿到锁
  21.                 flags |= NGX_POST_EVENTS;// event事件延迟处理,先处理 accept事件
  22.             } else {
  23.                 if (timer == NGX_TIMER_INFINITE // 不超时
  24.                     || timer > ngx_accept_mutex_delay)
  25.                 {
  26.                     timer = ngx_accept_mutex_delay; //要等待 ngx_accept_mutex_delay 后再去抢锁
  27.                 }
  28.             }
  29.         }
  30.     }

  31.     delta = ngx_current_msec;

  32.     (void) ngx_process_events(cycle, timer, flags); // ngx_epoll_process_events

  33.     delta = ngx_current_msec - delta;

  34.     ngx_log_debug1(NGX_LOG_DEBUG_EVENT, cycle->log, 0, "timer delta: %M", delta);

  35.     if (ngx_posted_accept_events) {//延时处理accept事件
  36.         ngx_event_process_posted(cycle, &ngx_posted_accept_events);
  37.     }

  38.     if (ngx_accept_mutex_held) {// 拥有锁,赶紧释放掉
  39.         ngx_shmtx_unlock(&ngx_accept_mutex);
  40.     }
  41.     // 看是否有 定时器信息
  42.     if (delta) {
  43.         ngx_event_expire_timers();
  44.     }

  45.     ngx_log_debug1(NGX_LOG_DEBUG_EVENT, cycle->log, 0, "posted events %p", ngx_posted_events);

  46.     if (ngx_posted_events) { // 延时处理事件
  47.         if (ngx_threaded) {
  48.             ngx_wakeup_worker_thread(cycle);
  49.         } else {
  50.             ngx_event_process_posted(cycle, &ngx_posted_events);
  51.         }
  52.     }
  53. }
(4)epoll处理事件
点击(此处)折叠或打开
  1. // epoll 处理事件的函数
  2. static ngx_int_t ngx_epoll_process_events(ngx_cycle_t *cycle, ngx_msec_t timer, ngx_uint_t flags)
  3. {
  4.     int events;
  5.     uint32_t revents;
  6.     ngx_int_t instance, i;
  7.     ngx_uint_t level;
  8.     ngx_err_t err;
  9.     ngx_event_t *rev, *wev, **queue;
  10.     ngx_connection_t *c;

  11.     /* NGX_TIMER_INFINITE == INFTIM */

  12.     ngx_log_debug1(NGX_LOG_DEBUG_EVENT, cycle->log, 0, "epoll timer: %M", timer);

  13.     events = epoll_wait(ep, event_list, (int) nevents, timer);

  14.     err = (events == -1) ? ngx_errno : 0; // 出错了

  15.     if (flags & NGX_UPDATE_TIME || ngx_event_timer_alarm) {
  16.         ngx_time_update(); //更新时间
  17.     }

  18.     if (err) {
  19.         if (err == NGX_EINTR) {

  20.             if (ngx_event_timer_alarm) {
  21.                 ngx_event_timer_alarm = 0;
  22.                 return NGX_OK;
  23.             }

  24.             level = NGX_LOG_INFO;

  25.         } else {
  26.             level = NGX_LOG_ALERT;
  27.         }

  28.         ngx_log_error(level, cycle->log, err, "epoll_wait() failed");
  29.         return NGX_ERROR;
  30.     }

  31.     if (events == 0) {
  32.         if (timer != NGX_TIMER_INFINITE) {
  33.             return NGX_OK;
  34.         }

  35.         ngx_log_error(NGX_LOG_ALERT, cycle->log, 0,
  36.                       "epoll_wait() returned no events without timeout");
  37.         return NGX_ERROR;
  38.     }

  39.     ngx_mutex_lock(ngx_posted_events_mutex);

  40.     for (i = 0; i < events; i++) {
  41.         c = event_list[i].data.ptr;

  42.         instance = (uintptr_t) c & 1;
  43.         c = (ngx_connection_t *) ((uintptr_t) c & (uintptr_t) ~1);

  44.         rev = c->read;

  45.         if (c->fd == -1 || rev->instance != instance) {

  46.             /*
  47.              * the stale event from a file descriptor
  48.              * that was just closed in this iteration
  49.              */

  50.             ngx_log_debug1(NGX_LOG_DEBUG_EVENT, cycle->log, 0, "epoll: stale event %p", c);
  51.             continue;
  52.         }

  53.         revents = event_list[i].events; // EPOLLIN, EPOLLOUT,EPOLLHUP,EPOLLERR

  54.         ngx_log_debug3(NGX_LOG_DEBUG_EVENT, cycle->log, 0, "epoll: fd:%d ev:%04XD d:%p"c->fd, revents, event_list[i].data.ptr);

  55.         if (revents & (EPOLLERR|EPOLLHUP)) {
  56.             ngx_log_debug2(NGX_LOG_DEBUG_EVENT, cycle->log, 0, "epoll_wait() error on fd:%d ev:%04XD"c->fd, revents);
  57.         }

  58.         if ((revents & (EPOLLERR|EPOLLHUP))
  59.              && (revents & (EPOLLIN|EPOLLOUT)) == 0)
  60.         {
  61.             /*
  62.              * if the error events were returned without EPOLLIN or EPOLLOUT,
  63.              * then add these flags to handle the events at least in one
  64.              * active handler
  65.              */

  66.             revents |= EPOLLIN|EPOLLOUT;
  67.         }

  68.         if ((revents & EPOLLIN) && rev->active) {//接收状态为active
  69.             // 事件延迟处理 & 不是accept事件
  70.             if ((flags & NGX_POST_THREAD_EVENTS) && !rev->accept) {
  71.                 rev->posted_ready = 1;

  72.             } else {
  73.                 rev->ready = 1;//立即处理
  74.             }

  75.             if (flags & NGX_POST_EVENTS) {//延迟处理事件
  76.                 queue = (ngx_event_t **) (rev->accept ? // 选择队列
  77.                                &ngx_posted_accept_events : &ngx_posted_events);

  78.                 ngx_locked_post_event(rev, queue);//若延迟处理事件,只是将事件放入队列

  79.             } else { // 立即处理事件
  80.                 rev->handler(rev); //处理事件的回调函数
  81.             }
  82.         }

  83.         wev = c->write;

  84.         if ((revents & EPOLLOUT) && wev->active) {

  85.             if (c->fd == -1 || wev->instance != instance) {

  86.                 /*
  87.                  * the stale event from a file descriptor
  88.                  * that was just closed in this iteration
  89.                  */

  90.                 ngx_log_debug1(NGX_LOG_DEBUG_EVENT, cycle->log, 0, "epoll: stale event %p", c);
  91.                 continue;
  92.             }

  93.             if (flags & NGX_POST_THREAD_EVENTS) {
  94.                 wev->posted_ready = 1;

  95.             } else {
  96.                 wev->ready = 1;
  97.             }

  98.             if (flags & NGX_POST_EVENTS) {
  99.                 ngx_locked_post_event(wev, &ngx_posted_events);

  100.             } else {
  101.                 wev->handler(wev); // 处理写事件
  102.             }
  103.         }
  104.     }

  105.     ngx_mutex_unlock(ngx_posted_events_mutex);

  106.     return NGX_OK;
  107. }
(5)event被延迟处理
点击(此处)折叠或打开
  1. void ngx_event_process_posted(ngx_cycle_t *cycle, ngx_thread_volatile ngx_event_t **posted)
  2. {
  3.     ngx_event_t *ev;

  4.     for ( ;; ) {
  5.         ev = (ngx_event_t *) *posted;
  6.         ngx_log_debug1(NGX_LOG_DEBUG_EVENT, cycle->log, 0, "posted event %p", ev);

  7.         if (ev == NULL) {
  8.             return;
  9.         }

  10.         ngx_delete_posted_event(ev); //从队列中删掉

  11.         ev->handler(ev); //处理
  12.     }
  13. }

阅读(3283) | 评论(0) | 转发(0) |
给主人留下些什么吧!~~