摘要:同步和互斥是进程间很重要的交互模式,而生产者和消费者问题则是同步和互斥的一个景点模型。我认为理解概念应该结合实际来进行,所以本文以实现生产者和消费者模型的形式来讲述同步和互斥。本文先简单谈了个人对同步和互斥的概念的理解,然后用伪码的形式给出了各类生产者和消费者的解决方法,最后用C程序的形式实现了进程版的生产者和消费者模型。PS:C程序的运行平台是linux2.4.8,编译器为gcc4.1.1。
一 同步和互斥的概念
什么是同步?什么是互斥?
同步是一种时序关系。如规定了进程1处理完事情A后,进程2才能处理事情B,经典的同步问题是生产者和消费者间的同步.
互斥描述的是一种独占关系.如任一时刻,进城1和进程2中只能有一个写文件C.
有人说互斥是一种特殊的同步,同步是一种更为复杂的关系(见" http//topic.csdn.net/t/20020711/17/867228.html"),但我不这么认为!
首先上面的言论是矛盾的,如果A和B互相包含,那A只能等于B,而同步和互斥显然是不等的。从前面的描述可以看出,同步和互斥是你中有我,我中有你的关系。当然如果你要从更广义地角度来说互斥是一种特殊的同步,我也没办法,毕竟我们的程序正确运行的前提就是同步。
二 生产者和消费者的问题分析
生产者和消费者的解答网络上有多种线程版本,但却没看到进程版本,所以我就来填补这一“空白”了。PS:使用进程版本的另一个重要原因是,想顺便复习下共享内存。
我们使用信号量来同步,用一个整型数组来当缓冲区。很显然这两者都要能够在各生产者和消费者进程间全局可见,所以我们用共享内存来实现他们。
生产者和消费者问题从易到难有三种。
1. 一个生产者和一个消费者,公用一个缓冲区
解决办法,定义两个信号量。
empty:表示缓冲区是否为空,初值为1,生产者用它来判断缓冲区是否可写。
full:表示缓冲区是否为满,初值为0,消费者用它来判断缓冲区是否可读。
producer(生产者)的伪码:
while(1)
{
P(empty);
写缓冲区;
V(full);
}
consumer(消费者)的伪码:
while(1)
{
P(full);
写缓冲区;
V(empty);
}
2. 一个生产者和一个消费者,公用m个环形缓冲区
分析过程与第一种情况类似,直接看伪码。
producer(生产者)的伪码:
while(1)
{
P(empty); /* empty初值为m */
写第in个缓冲区; /* in用来指示当前的第一个可写的缓冲区的下标,初值设为0。 */
in = (in+1)%m;
V(full);
}
consumer(消费者)的伪码:
while(1)
{
P(full); /* full初值为0 */
读第out个缓冲区; /* out用来指示当前的第一个可读的缓冲区的下标,初值设为0。 */
out = (out+1)%m;
V(empty);
}
3. 一组生产者和一组消费者,公用m个环形缓冲区
相比第2种情况,我们所要做的是用两个互斥变量mutex_producer和mutex_consumer,来实现各生产者间、各消费者间互斥地访问某个缓冲区。
producer(生产者)的伪码:
while(1)
{
P(empty); /* empty初值为m */
P(mutex_producer);
写第in个缓冲区; /* in用来指示当前的第一个可写的缓冲区的下标,初值设为0。 */
in = (in+1)%m;
V(mutex_producer);
V(full);
}
consumer(消费者)的伪码:
while(1)
{
P(full); /* full初值为0 */
P(mutex_consumer);
读第out个缓冲区; /* out用来指示当前的第一个可读的缓冲区的下标,初值设为0。 */
out = (out+1)%m;
V(mutex_consumer);
V(empty);
}
三 生产者和消费者进程版的实现
第一种情况很简单,我们直接来看后两种情况。
1. 一个生产者和一个消费者,公用m个环形缓冲区
文件:producer_consumer.c
#include
#include
#include
#include /* 提供了信号量的相关操作 */
#include "error_plp.h" /* 这是我自定义的一个出错处理函数,具体内容见后 */
#include /* 提供了共享内存的相关操作 */
#include
#include
#define BUFFER_SIZE 10 /* 公用环形缓冲区的大小 */
#define RWRWRW S_IRUSR|S_IWUSR|S_IRGRP|S_IWGRP|S_IROTH|S_IWOTH /* 设定创建的文件的访问权限为:用户、组用户和其他用户都可读可写 */
int head, tail; /* 即上面提到的out和in指针,分别用来指示当前第一个可读和可写的缓冲区的下标*/
int value_read, value_write; /* 分别用来保存读到的值和要写的值 */
int *shared_buffer; /* 公用环形缓冲区指针,为了公用,这个指针将指向一个共享内存的数组 */
sem_t *full, *empty; /* 分别指向full和empty这两个信号量的指针,同样地,为了公用,这两个指针指向的信号量在共享内存中实现 */
void producer(void); /* 生产者所执行的代码 */
void consumer(void); /* 消费者所执行的代码 */
int main(void)
{
int fd;
pid_t pid;
void *ptr;
int length;
/* 初始化 */
head = 0;
tail = 0;
value_read = 0;
value_write = 0;
/* 计算共享内存的长度 */
length = 2*sizeof(sem_t) + BUFFER_SIZE*sizeof(int);
/* shm_open是一个POSIX函数,用来打开或创建一个与“/shm”关联的共享内存区 */
if((fd = shm_open("/shm", O_RDWR | O_CREAT, RWRWRW)) == -1)
{
err_exit("shm_open error"); /* 出错提示,可用简单的printf或fprintf代替 */
}
if(ftruncate(fd, length) == -1) /* 截短共享内存的长度到我们所需要的长度 */
{
err_exit("ftruncate error");
}
if((ptr = mmap(0, length, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0)) == MAP_FAILED) /* 将共享内存映射到进程地址空间 */
{
err_exit("mmap error");
}
/* 共享内存的变量布局依次为:full, empty, shared_buffer */
full = (sem_t *)ptr;
empty =((sem_t *)ptr) + 1;
shared_buffer =(int *)(((sem_t *)ptr) + 2 ) ;
sem_init(full, 1, 0); /* 初始化full为0,且进程间共享 */
sem_init(empty, 1, BUFFER_SIZE); /* 初始化empty为BUFFER_SIZE,且进程间共享 */
switch(pid = fork()) /* 生成子进程,因为子进程继承了父进程的地址空间所以,共享内存在父子进程间都可见(这部分地址空间都映射到一个内核区域) */
{
case -1: /* 生成子进程失败 */
err_exit("fork error");
break;
case 0: /* 子进程 */
producer(); /* 子进程是生产者 */
exit(0);
break;
default:
consumer(); /* 父进程是消费者 */
break;
}
shm_unlink("/shm"); /* 删除共享内存区,程序中基本上保证了子进程先退出,因此父进程中无wait操作且这部操作放在父进程这里 */
}
/* 生产者写5次后退出 */
void producer(void)
{
while(value_write < 5) /* 退出条件判定 */
{
sem_wait(empty); /* 是否有空缓冲区,有则占有,无则被挂起,是原子操作 */
sleep(2); /* 休眠2s,测试时休眠时间可修改或用随机数代替 */
value_write++;
shared_buffer[tail] = value_write;
printf("write %5d to position %5d\n", value_write, tail+1);
tail= (tail+1)%BUFFER_SIZE; /* 移动写指针 */
sem_post(full); /* 写完一个缓冲区,释放信号量full(值加1) */
}
}
/* 消费者读5次后退出 */
void consumer(void)
{
while(value_read < 5) /* 退出条件判定 */
{
sem_wait(full); /* 获取信号量 */
sleep(1); * 休眠1s,测试时休眠时间可修改或用随机数代替 *
value_read = shared_buffer[head];
printf("read %5d from position %5d\n", value_read, head+1);
head = (head+1)%BUFFER_SIZE; /* 移动读指针 */
sem_post(empty); /* 读完一个缓冲区,释放信号量empty(值加1) */
}
}
文件:error_plp.h
#ifndef _ERROR_PLP_H
#define _ERROR_PLP_H
#include
#include
/* 以下两个函数都定义在error_plp.c中 */
void err_ret(const char *fmt, ...);
void err_exit(const char *fmt, ...);
#endif /* _ERROR_PLP_H */
文件:error_plp.c
include "error_plp.h"
#include
#include
#include
#include
#include
#ifdef MAXLINE
#undef MAXLINE
#endif
#define MAXLINE 4096
static void err_doit(const char *fmt, va_list ap);
void err_ret(const char *fmt, ...)
{
va_list ap;
va_start(ap, fmt);
err_doit(fmt, ap);
va_end(ap);
}
void err_exit(const char *fmt, ...)
{
va_list ap;
va_start(ap, fmt);
err_doit(fmt, ap);
va_end(ap);
exit(1);
}
static void err_doit(const char *fmt, va_list ap)
{
char buf[MAXLINE];
int ret;
ret = vsnprintf(buf, MAXLINE, fmt, ap);
if(ret < 0)
{
return;
}
snprintf(buf+strlen(buf), MAXLINE-strlen(buf), ": %s", strerror(errno));
strcat(buf, "\n"); /* snprintf has assured the last character */
fflush(stdout);
fputs(buf, stderr);
fflush(NULL); /* necessary? */ /* 估计在重定向里面有用 */
}
可以参照源码中的注释来理解程序。要特别说明的是:这里的信号亮的操作函数代码在librt(实时库)中,所以我们编译链接时要加上-lrt选项,并且注意注意将这几个文件都放到同一文件夹中。
在shell下输入“gcc –o producer_consumer producer_consumer.c error_plp.c -lrt”生成可执行文件。
2. 一组生产者和一组消费者,公用m个环形缓冲区
处理思路请参考前面的伪码。我们的源代码中增加了两个函数:random_generator,用来生成随机数,作休眠时间用,以方便测试;process_create用来生成子进程,这些子进程用来做生产者或消费者。
文件:producer_consumer_n.c
#include
#include
#include
#include /* 提供了信号量的相关操作 */
#include "error_plp.h" /* 这是我自定义的一个出错处理函数,具体内容见前面的error_plp.h和error_plp.c */
#include /* 提供了共享内存的相关操作 */
#include
#include
#include
#include
#define BUFFER_SIZE 10 /* 公用环形缓冲区的大小 */
#define PRODUCER_NUM 5 /* 生产者进程的个数 */
#define CONSUMER_NUM 5 /* 消费者进程的个数 */
#define RWRWRW S_IRUSR|S_IWUSR|S_IRGRP|S_IWGRP|S_IROTH|S_IWOTH /* 设定创建的文件的访问权限为:用户、组用户和其他用户都可读可写 */
int *pwrite, *pread; /* 即上面提到的out和in指针,分别用来指示当前第一个可读和可写的缓冲区的下标*/
int value_read, value_write; /* 分别用来保存读到的值和要写的值 */
int *shared_buffer; /* 公用环形缓冲区指针,为了公用,这个指针将指向一个共享内存的数组 */
sem_t *full, *empty; /* 分别指向full和empty这两个信号量的指针,同样地,为了公用,这两个指针指向的信号量在共享内存中实现 */
sem_t *mutex_producer, *mutex_consumer; /* 分别用来互斥生产者间以及消费者间的操作 */
extern int random_generator(int start, int end); /* 用来生成start~end间(包括start和end,是离散闭区间)的随机数 */
void producer(void); /* 生产者所执行的代码 */
void consumer(void); /* 消费者所执行的代码 */
int process_create(pid_t *pid_new, void (*routine)(void)); /* 生成子进程函数。这个函数的接口类似pthread_creat,pid_new用来保存新的子进程的pid;routine是一个函数指针,指向子进程的执行函数 */
int main(void)
{
int i;
int fd;
pid_t pid;
pid_t pid_producer[PRODUCER_NUM], pid_consumer[CONSUMER_NUM]; /* 这两个数组分别用来保存生产者进程和消费者进程的pid */
void *ptr;
int length;
/* 初始化 */
value_read = 0;
value_write = 0;
/* 计算共享内存的长度 */
length = 4*sizeof(sem_t) + (BUFFER_SIZE + 2)*sizeof(int);
/* shm_open是一个POSIX函数,用来打开或创建一个与“/shm”关联的共享内存区 */
if((fd = shm_open("/shm", O_RDWR | O_CREAT, RWRWRW)) == -1)
{
err_exit("shm_open error"); /* 出错提示,可用简单的printf或fprintf代替 */
}
if(ftruncate(fd, length) == -1) /* 截短共享内存的长度到我们所需要的长度 */
{
err_exit("ftruncate error");
}
if((ptr = mmap(0, length, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0)) == MAP_FAILED) /* 将共享内存(内核中的一个区域)映射到进程地址空间 */
{
err_exit("mmap error");
}
/* 共享内存的变量布局依次为:full, empty, mutex_producer, mutex_consumer, pwrite, pread, shared_buffer */
full = (sem_t *)ptr;
empty =((sem_t *)ptr) + 1;
mutex_producer = ((sem_t *)ptr) + 2;
mutex_consumer = ((sem_t *)ptr) + 3;
pwrite = (int *)(((sem_t *)ptr) + 4);
pread = (int *)(((sem_t *)ptr) + 4) + 1;
shared_buffer =(int *)(((sem_t *)ptr) + 4 ) + 2 ;
/* 初始化 */
sem_init(full, 1, 0); /* 初始化full为0,且进程间共享 */
sem_init(empty, 1, BUFFER_SIZE); /* 初始化empty为BUFFER_SIZE,且进程间共享 */
sem_init(mutex_producer, 1, 1); /* 初始化mutex_producer为1,且进程间共享,用来互斥生产者间的操作 */
sem_init(mutex_consumer, 1, 1); /* 初始化mutex_consumer为1,且进程间共享,用来互斥消费者间的操作 */
*pwrite = 0; /* 初始化写指针为0,即从第一个缓冲区开始写 */
*pread = 0; /* 初始化读指针为0,即从第一个缓冲区开始读(当然必须在生产者放入了产品后才能读) */
for(i = 0; i < PRODUCER_NUM; i++) /* 生成生产者进程 */
{
if(process_create(&pid_producer[i], producer) != 0) /* 生产者进程的执行函数为producer */
{
/* kill(0, signum) */ /* 生成失败,尚无好的处理办法 */
}
}
for(i = 0; i < CONSUMER_NUM; i++) /* 生成消费者进程 */
{
if(process_create(&pid_consumer[i], consumer) != 0) * 消费者进程的执行函数为consumer */
{
/* kill(0, signum) */ /* 生成失败,尚无好的处理办法 */
}
}
for(i = 0; i < PRODUCER_NUM + CONSUMER_NUM; i++) /* wait处理,避免僵尸进程(zombie) */
{
waitpid(0, NULL, 0);
}
shm_unlink("/shm"); /* 父进程是最后退出的,所以在他这里删除共享内存区 */
return 0;
}
/* 生产者写10次后退出 */
void producer(void)
{
while(value_write < 10) /* 判定退出条件 */
{
sem_wait(empty); /* 是否有空缓冲区,有则占有,无则被挂起,是原子操作 */
sleep(random_generator(1, 5)); /* 休眠一段随机的时间(1s~5s,包括端点) */
sem_wait(mutex_producer); /* 获取互斥量,用来访问pwrite操作 */
value_write++;
shared_buffer[*pwrite] = value_write; /* 注意互斥区操作应尽可能少,把这个语句和后面的打印语句放到互斥区里面,是为了更准确的查看测试结果(如果不放到互斥区,则打印的顺序是不确定的) */
printf("in pid: %ld, write %5d to position %5d\n", (long)getpid(), value_write, *pwrite+1);
*pwrite= (*pwrite+1)%BUFFER_SIZE; /* 修改写指针 */
sem_post(mutex_producer); /* 释放互斥量 */
sem_post(full); /* 写完一个缓冲区,释放信号量full(值加1) */
}
}
/* 消费者写10次后退出 */
void consumer(void)
{
while(value_read < 10) /* 判定退出条件 */
{
sem_wait(full); /* 是否有可读的缓冲区,有则占有,无则被挂起,是原子操作 */
sleep(random_generator(1, 5)); /* 休眠一段随机的时间(1s~5s,包括端点) */
sem_wait(mutex_consumer); /* 获取互斥量,用来访问pread */
value_read = shared_buffer[*pread]; /* 注意互斥区操作应尽可能少,把这个语句和后面的打印语句放到互斥区里面,是为了更准确的查看测试结果(如果不放到互斥区,则打印的顺序是不确定的) */
printf("in pid: %d, read %5d from position %5d\n", (long)getpid(), value_read, *pread+1);
*pread= (*pread+1)%BUFFER_SIZE; /* 修改读指针 */
sem_post(mutex_consumer); /* 释放互斥量 */
sem_post(empty); /* 读完一个缓冲区,释放信号量empty(值加1) */
}
}
/* 生成子进程函数。这个函数的接口类似pthread_creat,pid_new用来保存新的子进程的pid;routine是一个函数指针,指向子进程的执行函数 */
int process_create(pid_t *pid_new, void (*routine)(void))
{
pid_t pid;
switch(pid = fork())
{
case -1:
return errno;
break;
case 0: /* 子进程执行完routine后退出 */
routine();
exit(0);
break;
default:
*pid_new = pid;
return 0;
break;
}
}
/********************************random_generator****************************
**since all the random number can be normalized to: *
** from start to end, not include start and end. *
** where end>start>0 && start and end are integers *
******************************************************************************/
int random_generator(int start, int end)
{
int num;
struct timeval seed;
gettimeofday(&seed, NULL); /* 获取当前时间 */
srand(seed.tv_usec); /* 以当前时间的微秒值作随机种子 */
num = start + (int)((float)(end-start+1)*(rand()/(float)(RAND_MAX + 1.0))); /* 生成start~end间的随机数 */
return num;
}
同样地,保证各文件在同一文件夹中,error_plp.h和error_plp.c请参看第1部分的代码。在shell下输入“gcc –o producer_consumer_n producer_consumer_n.c error_plp.c -lrt”生成可执行文件。
关于这个程序中的同步和互斥操作,很多网上资料都说必须先同步再互斥,否则会有死锁,我认为这是错的。事实上,生产者中先同步再互斥,而消费者先互斥再同步,或反之;以及生产者和消费者都先互斥再同步这几种情况都不会死锁,因为它们间并没有交叉关系,就更不可能形成死锁环。之所以先同步,再互斥,是为了更好的并发性:并发性的瓶颈是互斥区,先同步再互斥,使得互斥区代码更短。
同步和互斥的进一步讨论,将在后面的几篇文章中陆续给出,敬请多关注^_^