转自:http://blog.lifeibo.com/?p=317
本文,我们来分析下nginx中对共享内存和锁的使用。
在nginx中,很多地方使用到了共享内存,在我们的应用中,往往有一些数据需要在多进程间进行共享,了解了共享内存的实现与使用,对我们写程序可以提供
很多帮助。在我之前的博文中也有介绍到共享内存的使用与slab分配器,以及红黑树的使用。本文,我将从底层实现上简单介绍下nginx共享内存的实现与
锁的利用。
由于nginx不同版本间会有一些差异,我这里是按照nginx-1.0.6版本来分析。
1. 共享内存
nginx在共享内存操作相关的诉在src/os/unix/ngx_shmem.c与src/os/unix/ngx_shmem.h中。
我们先来看看这个结构体
- typedef struct {
- u_char *addr;
- size_t size;
- ngx_str_t name;
- ngx_log_t *log;
- ngx_uint_t exists;
- } ngx_shm_t;
typedef struct {
u_char *addr; // 共享内存首地址
size_t size; // 共享内存大小
ngx_str_t name; // 共享内存名称
ngx_log_t *log; // 日志
ngx_uint_t exists; /* unsigned exists:1; */
} ngx_shm_t;
在ngx_shmem.c中我们可以看到根据不同的宏,会有不同的共享内存实现方式。我们看看NGX_HAVE_MAP_ANON的这种方式,它很
简单就是使用mmap的方式来创建共享内存。但从代码中,我们可以看出,它创建出来的共享内存是不与文件相关的,也就是不会映射到文件,当然,当
nginx重启后,共享内存里面的内容就消失了。从代码中,我们可以看到,这里没有用到name与exists。
- ngx_int_t
- ngx_shm_alloc(ngx_shm_t *shm)
- {
- shm->addr = (u_char *) mmap(NULL, shm->size,
- PROT_READ|PROT_WRITE,
- MAP_ANON|MAP_SHARED, -1, 0);
- if (shm->addr == MAP_FAILED) {
- ngx_log_error(NGX_LOG_ALERT, shm->log, ngx_errno,
- "mmap(MAP_ANON|MAP_SHARED, %uz) failed", shm->size);
- return NGX_ERROR;
- }
- return NGX_OK;
- }
- void
- ngx_shm_free(ngx_shm_t *shm)
- {
- if (munmap((void *) shm->addr, shm->size) == -1) {
- ngx_log_error(NGX_LOG_ALERT, shm->log, ngx_errno,
- "munmap(%p, %uz) failed", shm->addr, shm->size);
- }
- }
ngx_int_t
ngx_shm_alloc(ngx_shm_t *shm)
{
shm->addr = (u_char *) mmap(NULL, shm->size,
PROT_READ|PROT_WRITE,
MAP_ANON|MAP_SHARED, -1, 0);
if (shm->addr == MAP_FAILED) {
ngx_log_error(NGX_LOG_ALERT, shm->log, ngx_errno,
"mmap(MAP_ANON|MAP_SHARED, %uz) failed", shm->size);
return NGX_ERROR;
}
return NGX_OK;
}
void
ngx_shm_free(ngx_shm_t *shm)
{
if (munmap((void *) shm->addr, shm->size) == -1) {
ngx_log_error(NGX_LOG_ALERT, shm->log, ngx_errno,
"munmap(%p, %uz) failed", shm->addr, shm->size);
}
}
共享内存代码的分析很简单,当然使用也很简单了。而在一般情况下,我们在共享内存里面会存放比较复杂的数据结构,需要经常操作共享内存。而如果我们需要经
常分配与释放共享内存,如果每次分配与释放都去调用mmap与munmap的话,这样效率很非常的底,所以在nginx中会使用slab分配器来辅助共享
内存的使用。通常的做法是,先预先分配出一块较大的共享内存池,然后在之后分配共享内存时,就使用slab分配器从共享内存池里面分配出我们需要的内存大
小。这种方法在我之前的文章中有介绍。
那么,共享内存的创建就很简单了,看代码:
- ngx_shm_t shm;
-
- shm.size = 1024;
- shm.name.len = sizeof("nginx_shared_zone");
- shm.name.data = (u_char *) "nginx_shared_zone";
- shm.log = ngx_cycle->log;
-
- if (ngx_shm_alloc(&shm) != NGX_OK) {
- return NGX_ERROR;
- }
ngx_shm_t shm;
shm.size = 1024;
shm.name.len = sizeof("nginx_shared_zone");
shm.name.data = (u_char *) "nginx_shared_zone";
shm.log = ngx_cycle->log;
if (ngx_shm_alloc(&shm) != NGX_OK) {
return NGX_ERROR;
}
nginx中共享内存的实现比较简单,当然也比较局限。首先,共享内存只会存在于内存中,不会保存到文件,所以当程序退出后,共享内存里面的数据都
会丢失,这需要我们手动去缓存到文件。其次,在使用slab分配器的时候,如果一旦出现共享内存越界的时候,会导致意想不到的后果,而且这种错误无法通过
内存检查工具来检查。所以在使用共享内存时,请务必小心。
2. 锁的实现
先看看ngx_shmtx_t这个结构体:
- typedef struct {
- #if (NGX_HAVE_ATOMIC_OPS)
- ngx_atomic_t *lock;
- #if (NGX_HAVE_POSIX_SEM)
- ngx_uint_t semaphore;
- sem_t sem;
- #endif
- #else
- ngx_fd_t fd;
- u_char *name;
- #endif
- ngx_uint_t spin;
- } ngx_shmtx_t;
typedef struct {
#if (NGX_HAVE_ATOMIC_OPS)
ngx_atomic_t *lock;
#if (NGX_HAVE_POSIX_SEM)
ngx_uint_t semaphore;
sem_t sem;
#endif
#else
ngx_fd_t fd;
u_char *name;
#endif
ngx_uint_t spin;
} ngx_shmtx_t;
我们可以看到,根据NGX_HAVE_ATOMIC_OPS宏,有两种不同的形式,lock或fd。如果是在fd的情况下,nginx通过对文件句柄的加锁来实现的。对于这种方式,这里不做过多介绍。
在fd模式下,ngx_shmtx_lock调用ngx_lock_fd来实现,而ngx_lock_fd的代码如下:
- ngx_err_t
- ngx_lock_fd(ngx_fd_t fd)
- {
- struct flock fl;
- fl.l_start = 0;
- fl.l_len = 0;
- fl.l_pid = 0;
- fl.l_type = F_WRLCK;
- fl.l_whence = SEEK_SET;
- if (fcntl(fd, F_SETLKW, &fl) == -1) {
- return ngx_errno;
- }
- return 0;
- }
ngx_err_t
ngx_lock_fd(ngx_fd_t fd)
{
struct flock fl;
fl.l_start = 0;
fl.l_len = 0;
fl.l_pid = 0;
fl.l_type = F_WRLCK;
fl.l_whence = SEEK_SET;
if (fcntl(fd, F_SETLKW, &fl) == -1) {
return ngx_errno;
}
return 0;
}
在NGX_HAVE_ATOMIC_OPS有设置的情况下,而且没有使用信号量的时候,ngx_shmtx_t这个结构体就变得很简单了:
- typedef struct {
- ngx_atomic_t *lock;
- ngx_uint_t spin;
- } ngx_shmtx_t;
typedef struct {
ngx_atomic_t *lock; // 指向存放在共享内存里面的lock的地址
ngx_uint_t spin; // 自旋锁时,可由它来控制自旋时间
} ngx_shmtx_t;
首先,调用ngx_shmtx_create来创建:
- ngx_int_t
- ngx_shmtx_create(ngx_shmtx_t *mtx, void *addr, u_char *name)
- {
-
- mtx->lock = addr;
-
- if (mtx->spin == (ngx_uint_t) -1) {
- return NGX_OK;
- }
-
- mtx->spin = 2048;
- return NGX_OK;
- }
ngx_int_t
ngx_shmtx_create(ngx_shmtx_t *mtx, void *addr, u_char *name)
{
// 指向共享内存中的地址
mtx->lock = addr;
// 如果有指定为-1,则表示关掉自旋等待,在后面代码中我们可以看到
if (mtx->spin == (ngx_uint_t) -1) {
return NGX_OK;
}
// 默认为2048
mtx->spin = 2048;
return NGX_OK;
}
在ngx_shmtx_create中,可以看到在非fd模式下面是没有用到name的。在调用时,mtx为本地分配的一个结构体,而addr这个参数,则是在共享内存中分配的一个lock地址。看nginx自己的调用:
- static ngx_int_t
- ngx_event_module_init(ngx_cycle_t *cycle)
- size_t size, cl;
-
-
- cl = 128;
-
- size = cl
- + cl
- + cl;
-
- shm.size = size;
- shm.name.len = sizeof("nginx_shared_zone");
- shm.name.data = (u_char *) "nginx_shared_zone";
- shm.log = cycle->log;
-
-
- if (ngx_shm_alloc(&shm) != NGX_OK) {
- return NGX_ERROR;
- }
-
-
-
- shared = shm.addr;
-
- ngx_accept_mutex.spin = (ngx_uint_t) -1;
-
-
- if (ngx_shmtx_create(&ngx_accept_mutex, shared, cycle->lock_file.data)
- != NGX_OK)
- {
- return NGX_ERROR;
- }
- }
static ngx_int_t
ngx_event_module_init(ngx_cycle_t *cycle)
size_t size, cl;
// 要大于或等待cache line
cl = 128;
size = cl /* ngx_accept_mutex */
+ cl /* ngx_connection_counter */
+ cl; /* ngx_temp_number */
shm.size = size;
shm.name.len = sizeof("nginx_shared_zone");
shm.name.data = (u_char *) "nginx_shared_zone";
shm.log = cycle->log;
// 创建共享内存
if (ngx_shm_alloc(&shm) != NGX_OK) {
return NGX_ERROR;
}
// shm为创建的共享内存
// 因为之前有预留128字节,所以shared指向它是没有问题的
shared = shm.addr;
ngx_accept_mutex.spin = (ngx_uint_t) -1;
// 注意第二个参数,即共享内存的初始位置就是我们要创建shmtx的locked
if (ngx_shmtx_create(&ngx_accept_mutex, shared, cycle->lock_file.data)
!= NGX_OK)
{
return NGX_ERROR;
}
}
在上面这段代码中,我只挑了核心代码,为什么要之前留128字节,大于或等待cache
line呢?其实这里第一个目的是为了留给lock一个空间,另外一个主要的目的是为了提高性能。这样可以将lock与其它数据分开到不同的
cacheline中去,于是,其它数据的修改(其它几个数据是原子操作,可并发修改)就不会导致有lock的cacheline的失效。
cacheline的False sharing问题及其解决方法,可参考余老师http://blog.yufeng.info/archives/tag/cache-line中的介绍。
接下来,就是加锁和解锁了。ngx_shmtx_lock与ngx_shmtx_unlock。看看nginx是如何实现自旋锁的。
ngx_shmtx_lock的实现:
- void
- ngx_shmtx_lock(ngx_shmtx_t *mtx)
- {
- ngx_uint_t i, n;
- ngx_atomic_uint_t val;
- for ( ;; ) {
- val = *mtx->lock;
-
- if ((val & 0x80000000) == 0
- && ngx_atomic_cmp_set(mtx->lock, val, val | 0x80000000))
- {
- return;
- }
-
- if (ngx_ncpu > 1) {
- for (n = 1; n < mtx->spin; n <<= 1) {
-
-
- for (i = 0; i < n; i++) {
-
- ngx_cpu_pause();
- }
-
- val = *mtx->lock;
- if ((val & 0x80000000) == 0
- && ngx_atomic_cmp_set(mtx->lock, val, val | 0x80000000))
- {
- return;
- }
- }
- }
-
-
- ngx_sched_yield();
- }
- }
void
ngx_shmtx_lock(ngx_shmtx_t *mtx)
{
ngx_uint_t i, n;
ngx_atomic_uint_t val;
for ( ;; ) {
val = *mtx->lock;
// 如果还没有上锁,就加锁,然后返回,这里容易理解
if ((val & 0x80000000) == 0
&& ngx_atomic_cmp_set(mtx->lock, val, val | 0x80000000))
{
return;
}
// 在这里,如果在多核情况下,我们就需要再自旋等待一会了,因为在单核情况下,自旋等待是没有效果的,你都占用cpu了,其它拥有锁的进程又如何释放锁呢。
if (ngx_ncpu > 1) {
for (n = 1; n < mtx->spin; n <<= 1) {
// 每循环一次,就增加一倍的等待时间
// n = 1,2,4,8,16,32,64,128 ...
for (i = 0; i < n; i++) {
// 如果当前体系结构支持,就让cpu等待一会,理由挺多的,可以降低cpu的占用率,当然省电也是一种理由啦
ngx_cpu_pause();
}
// 重新获取最新数据,然后再尝试加锁
val = *mtx->lock;
if ((val & 0x80000000) == 0
&& ngx_atomic_cmp_set(mtx->lock, val, val | 0x80000000))
{
return;
}
}
}
// 如果是单核,就直接给别的进程执行了
// 否则,在自旋一段时间之后,如果还没有成功,则就让出cpu吧
ngx_sched_yield();
}
}
ngx_shmtx_unlock的实现就简单了:
- void
- ngx_shmtx_unlock(ngx_shmtx_t *mtx)
- {
- ngx_atomic_uint_t val, old, wait;
- for ( ;; ) {
- old = *mtx->lock;
- wait = old & 0x7fffffff;
-
-
- val = wait ? wait - 1 : 0;
- if (ngx_atomic_cmp_set(mtx->lock, old, val)) {
- break;
- }
- }
- }
void
ngx_shmtx_unlock(ngx_shmtx_t *mtx)
{
ngx_atomic_uint_t val, old, wait;
for ( ;; ) {
old = *mtx->lock;
wait = old & 0x7fffffff;
// 如果有加锁,那wait就是1,那val的值就是0
// 如果未加锁,那wait就是0,val还是0
val = wait ? wait - 1 : 0;
if (ngx_atomic_cmp_set(mtx->lock, old, val)) {
break;
}
}
}
可以看出,nginx实现的自旋锁还是非常高效的。
不过,nginx对锁的实现相对简单,为降低锁的消耗需要编程者小心,尽量减小锁的粒度。而且nginx中没有实现读写锁。
好,今天就介绍到这,Have fun!:)
阅读(2711) | 评论(0) | 转发(0) |