Chinaunix首页 | 论坛 | 博客
  • 博客访问: 3432641
  • 博文数量: 1450
  • 博客积分: 11163
  • 博客等级: 上将
  • 技术积分: 11101
  • 用 户 组: 普通用户
  • 注册时间: 2005-07-25 14:40
文章分类

全部博文(1450)

文章存档

2017年(5)

2014年(2)

2013年(3)

2012年(35)

2011年(39)

2010年(88)

2009年(395)

2008年(382)

2007年(241)

2006年(246)

2005年(14)

分类: LINUX

2011-12-09 10:09:43

转自:http://blog.lifeibo.com/?p=317

本文,我们来分析下nginx中对共享内存和锁的使用。
在nginx中,很多地方使用到了共享内存,在我们的应用中,往往有一些数据需要在多进程间进行共享,了解了共享内存的实现与使用,对我们写程序可以提供 很多帮助。在我之前的博文中也有介绍到共享内存的使用与slab分配器,以及红黑树的使用。本文,我将从底层实现上简单介绍下nginx共享内存的实现与 锁的利用。
由于nginx不同版本间会有一些差异,我这里是按照nginx-1.0.6版本来分析。

1. 共享内存
nginx在共享内存操作相关的诉在src/os/unix/ngx_shmem.c与src/os/unix/ngx_shmem.h中。
我们先来看看这个结构体

  1. typedef struct {  
  2.     u_char      *addr;      // 共享内存首地址  
  3.     size_t       size;      // 共享内存大小  
  4.     ngx_str_t    name;      // 共享内存名称  
  5.     ngx_log_t   *log;       // 日志  
  6.     ngx_uint_t   exists;   /* unsigned  exists:1;  */  
  7. } ngx_shm_t;  
typedef struct { u_char *addr; // 共享内存首地址 size_t size; // 共享内存大小 ngx_str_t name; // 共享内存名称 ngx_log_t *log; // 日志 ngx_uint_t exists; /* unsigned exists:1; */ } ngx_shm_t;

在ngx_shmem.c中我们可以看到根据不同的宏,会有不同的共享内存实现方式。我们看看NGX_HAVE_MAP_ANON的这种方式,它很 简单就是使用mmap的方式来创建共享内存。但从代码中,我们可以看出,它创建出来的共享内存是不与文件相关的,也就是不会映射到文件,当然,当 nginx重启后,共享内存里面的内容就消失了。从代码中,我们可以看到,这里没有用到name与exists。

  1. ngx_int_t  
  2. ngx_shm_alloc(ngx_shm_t *shm)  
  3. {  
  4.     shm->addr = (u_char *) mmap(NULL, shm->size,  
  5.                                 PROT_READ|PROT_WRITE,  
  6.                                 MAP_ANON|MAP_SHARED, -1, 0);  
  7.     if (shm->addr == MAP_FAILED) {  
  8.         ngx_log_error(NGX_LOG_ALERT, shm->log, ngx_errno,  
  9.                       "mmap(MAP_ANON|MAP_SHARED, %uz) failed", shm->size);  
  10.         return NGX_ERROR;  
  11.     }  
  12.     return NGX_OK;  
  13. }  
  14. void  
  15. ngx_shm_free(ngx_shm_t *shm)  
  16. {  
  17.     if (munmap((void *) shm->addr, shm->size) == -1) {  
  18.         ngx_log_error(NGX_LOG_ALERT, shm->log, ngx_errno,  
  19.                       "munmap(%p, %uz) failed", shm->addr, shm->size);  
  20.     }  
  21. }  
ngx_int_t ngx_shm_alloc(ngx_shm_t *shm) { shm->addr = (u_char *) mmap(NULL, shm->size, PROT_READ|PROT_WRITE, MAP_ANON|MAP_SHARED, -1, 0); if (shm->addr == MAP_FAILED) { ngx_log_error(NGX_LOG_ALERT, shm->log, ngx_errno, "mmap(MAP_ANON|MAP_SHARED, %uz) failed", shm->size); return NGX_ERROR; } return NGX_OK; } void ngx_shm_free(ngx_shm_t *shm) { if (munmap((void *) shm->addr, shm->size) == -1) { ngx_log_error(NGX_LOG_ALERT, shm->log, ngx_errno, "munmap(%p, %uz) failed", shm->addr, shm->size); } }


共享内存代码的分析很简单,当然使用也很简单了。而在一般情况下,我们在共享内存里面会存放比较复杂的数据结构,需要经常操作共享内存。而如果我们需要经 常分配与释放共享内存,如果每次分配与释放都去调用mmap与munmap的话,这样效率很非常的底,所以在nginx中会使用slab分配器来辅助共享 内存的使用。通常的做法是,先预先分配出一块较大的共享内存池,然后在之后分配共享内存时,就使用slab分配器从共享内存池里面分配出我们需要的内存大 小。这种方法在我之前的文章中有介绍。
那么,共享内存的创建就很简单了,看代码:

  1. ngx_shm_t            shm;  
  2.   
  3. shm.size = 1024;  
  4. shm.name.len = sizeof("nginx_shared_zone");  
  5. shm.name.data = (u_char *) "nginx_shared_zone";  
  6. shm.log = ngx_cycle->log;  
  7.   
  8. if (ngx_shm_alloc(&shm) != NGX_OK) {  
  9.     return NGX_ERROR;  
  10. }  
ngx_shm_t shm; shm.size = 1024; shm.name.len = sizeof("nginx_shared_zone"); shm.name.data = (u_char *) "nginx_shared_zone"; shm.log = ngx_cycle->log; if (ngx_shm_alloc(&shm) != NGX_OK) { return NGX_ERROR; }

nginx中共享内存的实现比较简单,当然也比较局限。首先,共享内存只会存在于内存中,不会保存到文件,所以当程序退出后,共享内存里面的数据都 会丢失,这需要我们手动去缓存到文件。其次,在使用slab分配器的时候,如果一旦出现共享内存越界的时候,会导致意想不到的后果,而且这种错误无法通过 内存检查工具来检查。所以在使用共享内存时,请务必小心。

2. 锁的实现
先看看ngx_shmtx_t这个结构体:

  1. typedef struct {  
  2. #if (NGX_HAVE_ATOMIC_OPS)  
  3.     ngx_atomic_t  *lock;  
  4. #if (NGX_HAVE_POSIX_SEM)  
  5.     ngx_uint_t     semaphore;  
  6.     sem_t          sem;  
  7. #endif  
  8. #else  
  9.     ngx_fd_t       fd;  
  10.     u_char        *name;  
  11. #endif  
  12.     ngx_uint_t     spin;  
  13. } ngx_shmtx_t;  
typedef struct { #if (NGX_HAVE_ATOMIC_OPS) ngx_atomic_t *lock; #if (NGX_HAVE_POSIX_SEM) ngx_uint_t semaphore; sem_t sem; #endif #else ngx_fd_t fd; u_char *name; #endif ngx_uint_t spin; } ngx_shmtx_t;

我们可以看到,根据NGX_HAVE_ATOMIC_OPS宏,有两种不同的形式,lock或fd。如果是在fd的情况下,nginx通过对文件句柄的加锁来实现的。对于这种方式,这里不做过多介绍。
在fd模式下,ngx_shmtx_lock调用ngx_lock_fd来实现,而ngx_lock_fd的代码如下:

  1. ngx_err_t  
  2. ngx_lock_fd(ngx_fd_t fd)  
  3. {  
  4.     struct flock  fl;  
  5.     fl.l_start = 0;  
  6.     fl.l_len = 0;  
  7.     fl.l_pid = 0;  
  8.     fl.l_type = F_WRLCK;  
  9.     fl.l_whence = SEEK_SET;  
  10.     if (fcntl(fd, F_SETLKW, &fl) == -1) {  
  11.         return ngx_errno;  
  12.     }  
  13.     return 0;  
  14. }  
ngx_err_t ngx_lock_fd(ngx_fd_t fd) { struct flock fl; fl.l_start = 0; fl.l_len = 0; fl.l_pid = 0; fl.l_type = F_WRLCK; fl.l_whence = SEEK_SET; if (fcntl(fd, F_SETLKW, &fl) == -1) { return ngx_errno; } return 0; }

在NGX_HAVE_ATOMIC_OPS有设置的情况下,而且没有使用信号量的时候,ngx_shmtx_t这个结构体就变得很简单了:

  1. typedef struct {  
  2.     ngx_atomic_t  *lock;        // 指向存放在共享内存里面的lock的地址  
  3.     ngx_uint_t     spin;        // 自旋锁时,可由它来控制自旋时间  
  4. } ngx_shmtx_t;  
typedef struct { ngx_atomic_t *lock; // 指向存放在共享内存里面的lock的地址 ngx_uint_t spin; // 自旋锁时,可由它来控制自旋时间 } ngx_shmtx_t;

首先,调用ngx_shmtx_create来创建:

  1. ngx_int_t  
  2. ngx_shmtx_create(ngx_shmtx_t *mtx, void *addr, u_char *name)  
  3. {  
  4.     // 指向共享内存中的地址  
  5.     mtx->lock = addr;  
  6.     // 如果有指定为-1,则表示关掉自旋等待,在后面代码中我们可以看到  
  7.     if (mtx->spin == (ngx_uint_t) -1) {  
  8.         return NGX_OK;  
  9.     }  
  10.     // 默认为2048  
  11.     mtx->spin = 2048;  
  12.     return NGX_OK;  
  13. }  
ngx_int_t ngx_shmtx_create(ngx_shmtx_t *mtx, void *addr, u_char *name) { // 指向共享内存中的地址 mtx->lock = addr; // 如果有指定为-1,则表示关掉自旋等待,在后面代码中我们可以看到 if (mtx->spin == (ngx_uint_t) -1) { return NGX_OK; } // 默认为2048 mtx->spin = 2048; return NGX_OK; }

在ngx_shmtx_create中,可以看到在非fd模式下面是没有用到name的。在调用时,mtx为本地分配的一个结构体,而addr这个参数,则是在共享内存中分配的一个lock地址。看nginx自己的调用:

  1. static ngx_int_t  
  2. ngx_event_module_init(ngx_cycle_t *cycle)  
  3.     size_t               size, cl;  
  4.   
  5.     // 要大于或等待cache line  
  6.     cl = 128;  
  7.   
  8.     size = cl            /* ngx_accept_mutex */  
  9.            + cl          /* ngx_connection_counter */  
  10.            + cl;         /* ngx_temp_number */  
  11.   
  12.     shm.size = size;  
  13.     shm.name.len = sizeof("nginx_shared_zone");  
  14.     shm.name.data = (u_char *) "nginx_shared_zone";  
  15.     shm.log = cycle->log;  
  16.   
  17.     // 创建共享内存  
  18.     if (ngx_shm_alloc(&shm) != NGX_OK) {  
  19.         return NGX_ERROR;  
  20.     }  
  21.   
  22.     // shm为创建的共享内存  
  23.     // 因为之前有预留128字节,所以shared指向它是没有问题的  
  24.     shared = shm.addr;  
  25.   
  26.     ngx_accept_mutex.spin = (ngx_uint_t) -1;  
  27.   
  28.     // 注意第二个参数,即共享内存的初始位置就是我们要创建shmtx的locked  
  29.     if (ngx_shmtx_create(&ngx_accept_mutex, shared, cycle->lock_file.data)  
  30.         != NGX_OK)  
  31.     {  
  32.         return NGX_ERROR;  
  33.     }  
  34. }  
static ngx_int_t ngx_event_module_init(ngx_cycle_t *cycle) size_t size, cl; // 要大于或等待cache line cl = 128; size = cl /* ngx_accept_mutex */ + cl /* ngx_connection_counter */ + cl; /* ngx_temp_number */ shm.size = size; shm.name.len = sizeof("nginx_shared_zone"); shm.name.data = (u_char *) "nginx_shared_zone"; shm.log = cycle->log; // 创建共享内存 if (ngx_shm_alloc(&shm) != NGX_OK) { return NGX_ERROR; } // shm为创建的共享内存 // 因为之前有预留128字节,所以shared指向它是没有问题的 shared = shm.addr; ngx_accept_mutex.spin = (ngx_uint_t) -1; // 注意第二个参数,即共享内存的初始位置就是我们要创建shmtx的locked if (ngx_shmtx_create(&ngx_accept_mutex, shared, cycle->lock_file.data) != NGX_OK) { return NGX_ERROR; } }

在上面这段代码中,我只挑了核心代码,为什么要之前留128字节,大于或等待cache line呢?其实这里第一个目的是为了留给lock一个空间,另外一个主要的目的是为了提高性能。这样可以将lock与其它数据分开到不同的 cacheline中去,于是,其它数据的修改(其它几个数据是原子操作,可并发修改)就不会导致有lock的cacheline的失效。
cacheline的False sharing问题及其解决方法,可参考余老师http://blog.yufeng.info/archives/tag/cache-line中的介绍。
接下来,就是加锁和解锁了。ngx_shmtx_lock与ngx_shmtx_unlock。看看nginx是如何实现自旋锁的。
ngx_shmtx_lock的实现:

  1. void  
  2. ngx_shmtx_lock(ngx_shmtx_t *mtx)  
  3. {  
  4.     ngx_uint_t         i, n;  
  5.     ngx_atomic_uint_t  val;  
  6.     for ( ;; ) {  
  7.         val = *mtx->lock;  
  8.         // 如果还没有上锁,就加锁,然后返回,这里容易理解  
  9.         if ((val & 0x80000000) == 0  
  10.             && ngx_atomic_cmp_set(mtx->lock, val, val | 0x80000000))  
  11.         {  
  12.             return;  
  13.         }  
  14.         // 在这里,如果在多核情况下,我们就需要再自旋等待一会了,因为在单核情况下,自旋等待是没有效果的,你都占用cpu了,其它拥有锁的进程又如何释放锁呢。  
  15.         if (ngx_ncpu > 1) {  
  16.             for (n = 1; n < mtx->spin; n <<= 1) {  
  17.                 // 每循环一次,就增加一倍的等待时间  
  18.                 // n = 1,2,4,8,16,32,64,128 ...  
  19.                 for (i = 0; i < n; i++) {  
  20.                     // 如果当前体系结构支持,就让cpu等待一会,理由挺多的,可以降低cpu的占用率,当然省电也是一种理由啦  
  21.                     ngx_cpu_pause();  
  22.                 }  
  23.                 // 重新获取最新数据,然后再尝试加锁  
  24.                 val = *mtx->lock;  
  25.                 if ((val & 0x80000000) == 0  
  26.                     && ngx_atomic_cmp_set(mtx->lock, val, val | 0x80000000))  
  27.                 {  
  28.                     return;  
  29.                 }  
  30.             }  
  31.         }  
  32.         // 如果是单核,就直接给别的进程执行了  
  33.         // 否则,在自旋一段时间之后,如果还没有成功,则就让出cpu吧  
  34.         ngx_sched_yield();  
  35.     }  
  36. }  
void ngx_shmtx_lock(ngx_shmtx_t *mtx) { ngx_uint_t i, n; ngx_atomic_uint_t val; for ( ;; ) { val = *mtx->lock; // 如果还没有上锁,就加锁,然后返回,这里容易理解 if ((val & 0x80000000) == 0 && ngx_atomic_cmp_set(mtx->lock, val, val | 0x80000000)) { return; } // 在这里,如果在多核情况下,我们就需要再自旋等待一会了,因为在单核情况下,自旋等待是没有效果的,你都占用cpu了,其它拥有锁的进程又如何释放锁呢。 if (ngx_ncpu > 1) { for (n = 1; n < mtx->spin; n <<= 1) { // 每循环一次,就增加一倍的等待时间 // n = 1,2,4,8,16,32,64,128 ... for (i = 0; i < n; i++) { // 如果当前体系结构支持,就让cpu等待一会,理由挺多的,可以降低cpu的占用率,当然省电也是一种理由啦 ngx_cpu_pause(); } // 重新获取最新数据,然后再尝试加锁 val = *mtx->lock; if ((val & 0x80000000) == 0 && ngx_atomic_cmp_set(mtx->lock, val, val | 0x80000000)) { return; } } } // 如果是单核,就直接给别的进程执行了 // 否则,在自旋一段时间之后,如果还没有成功,则就让出cpu吧 ngx_sched_yield(); } }

ngx_shmtx_unlock的实现就简单了:

  1. void  
  2. ngx_shmtx_unlock(ngx_shmtx_t *mtx)  
  3. {  
  4.     ngx_atomic_uint_t  val, old, wait;  
  5.     for ( ;; ) {  
  6.         old = *mtx->lock;  
  7.         wait = old & 0x7fffffff;  
  8.         // 如果有加锁,那wait就是1,那val的值就是0  
  9.         // 如果未加锁,那wait就是0,val还是0  
  10.         val = wait ? wait - 1 : 0;  
  11.         if (ngx_atomic_cmp_set(mtx->lock, old, val)) {  
  12.             break;  
  13.         }  
  14.     }  
  15. }  
void ngx_shmtx_unlock(ngx_shmtx_t *mtx) { ngx_atomic_uint_t val, old, wait; for ( ;; ) { old = *mtx->lock; wait = old & 0x7fffffff; // 如果有加锁,那wait就是1,那val的值就是0 // 如果未加锁,那wait就是0,val还是0 val = wait ? wait - 1 : 0; if (ngx_atomic_cmp_set(mtx->lock, old, val)) { break; } } }

可以看出,nginx实现的自旋锁还是非常高效的。
不过,nginx对锁的实现相对简单,为降低锁的消耗需要编程者小心,尽量减小锁的粒度。而且nginx中没有实现读写锁。

好,今天就介绍到这,Have fun!:)

阅读(2654) | 评论(0) | 转发(0) |
0

上一篇:cache line

下一篇:vimgrep 搜索

给主人留下些什么吧!~~