参考:
OS: 生产者消费者问题(多进程+共享内存+信号量)
http://blog.csdn.net/yaozhiyi/article/details/7561759
OS: 生产者消费者问题(多线程+互斥量+条件变量)
http://blog.csdn.net/yaozhiyi/article/details/7563050
生产者和消费者的问题:
生产者和消费者问题是多个相互合作的进程之间的一种抽象。生产者和消费者之间的关系:
1. 对缓冲区的访问是互斥的。由于两者都会修改缓冲区,因此,一方修改缓冲区时,另一方不能修改,这就是互斥。
2. 一方的行为影响另一方。缓冲区不空,才能消费,何时不空?生产了就不空;缓冲区满,就不能生产,何时不满?消费了就不满。这是同步关系。
为了描述这种关系,一方面,使用共享内存代表缓冲区;另一方面,使用 互斥信号量 控制对缓冲区的访问,使用同步信号量描述两者的依赖关系。
共享内存
共享存储的原理是将进程的地址空间映射到一个共享存储段。
shmget(得到一个共享内存标识符或创建一个共享内存对象)
shmat(把共享内存区对象映射到调用进程的地址空间)
shmdt(断开共享内存连接)
shmctl(共享内存管理)
参考:
// IPC_PRIVATE 表示创建信号量集, NUM_OF_SEM表示该集合中有多少信号量; FLAGS复杂不追究
semget(IPC_PRIVATE, NUM_OF_SEM, FLAGS );
// SEM_KEY 是 key_t 类型
//如果 SEM_KEY 代表的信号量集存在,则返回信号量集的ID
//如果不存在,则创建信号量集并返回ID
semget(SEM_KEY, NUM_OF_SEM,FLAGS);
2. 初始化信号量
创建的过程并未指定信号量的初始值,需要使用 semctl 函数指定。
semctl(int semSetId , int semIdx , int cmd, union semun su);
其中 semSetId 是指信号量集的 ID , semIdx 指信号量集中某个信号量的索引(从零开始), 如果是要设置信号量的值, 填 SETVAL 即可, 为了设置信号量的值,可以指定su.val为索要设置的值。
解释都在注释里面!
-
#include <stdio.h>
-
#include <sys/shm.h>
-
#include <sys/sem.h>
-
#include <stdlib.h>
-
#include <unistd.h>
-
#include <sys/types.h>
-
#define SHM_SIZE (1024*1024)
-
#define SHM_MODE 0600
-
#define SEM_MODE 0600
-
-
#if defined(__GNU_LIBRARY__) && !defined(_SEM_SEMUN_UNDEFINED)
-
/* union semun is defined by including <sys/sem.h> */
-
#else
-
/* according to X/OPEN we have to define it ourselves */
-
union semun{
-
int val;
-
struct semid_ds *buf;
-
unsigned short *array;
-
};
-
#endif
-
-
struct ShM{
-
int start;
-
int end;
-
}* pSM;
-
-
const int N_CONSUMER = 3;//消费者数量
-
const int N_BUFFER = 5;//缓冲区容量
-
int shmId = -1,semSetId=-1;
-
union semun su;//sem union,用于初始化信号量
-
-
//semSetId 表示信号量集合的 id
-
//semNum 表示要处理的信号量在信号量集合中的索引
-
void waitSem(int semSetId,int semNum)
-
{
-
struct sembuf sb;
-
sb.sem_num = semNum;
-
sb.sem_op = -1;//表示要把信号量减一
-
sb.sem_flg = SEM_UNDO;//
-
//第二个参数是 sembuf [] 类型的,表示数组
-
//第三个参数表示 第二个参数代表的数组的大小
-
if(semop(semSetId,&sb,1) < 0){
-
perror("waitSem failed");
-
exit(1);
-
}
-
}
-
void sigSem(int semSetId,int semNum)
-
{
-
struct sembuf sb;
-
sb.sem_num = semNum;
-
sb.sem_op = 1;
-
sb.sem_flg = SEM_UNDO;
-
//第二个参数是 sembuf [] 类型的,表示数组
-
//第三个参数表示 第二个参数代表的数组的大小
-
if(semop(semSetId,&sb,1) < 0){
-
perror("waitSem failed");
-
exit(1);
-
}
-
}
-
//必须在保证互斥以及缓冲区不满的情况下调用
-
void produce()
-
{
-
int last = pSM->end;
-
pSM->end = (pSM->end+1) % N_BUFFER;
-
printf("生产 %d\n",last);
-
}
-
//必须在保证互斥以及缓冲区不空的情况下调用
-
void consume()
-
{
-
int last = pSM->start;
-
pSM->start = (pSM->start + 1)%N_BUFFER;
-
printf("消耗 %d\n",last);
-
}
-
-
void init()
-
{
-
//缓冲区分配以及初始化
-
if((shmId = shmget(IPC_PRIVATE,SHM_SIZE,SHM_MODE)) < 0)
-
{
-
perror("create shared memory failed");
-
exit(1);
-
}
-
pSM = (struct ShM *)shmat(shmId,0,0);
-
pSM->start = 0;
-
pSM->end = 0;
-
-
//信号量创建
-
//第一个:同步信号量,表示先后顺序,必须有空间才能生产
-
//第二个:同步信号量,表示先后顺序,必须有产品才能消费
-
//第三个:互斥信号量,生产者和每个消费者不能同时进入缓冲区
-
-
if((semSetId = semget(IPC_PRIVATE,3,SEM_MODE)) < 0)
-
{
-
perror("create semaphore failed");
-
exit(1);
-
}
-
//信号量初始化,其中 su 表示 union semun
-
su.val = N_BUFFER;//当前库房还可以接收多少产品
-
if(semctl(semSetId,0,SETVAL, su) < 0){
-
perror("semctl failed");
-
exit(1);
-
}
-
su.val = 0;//当前没有产品
-
if(semctl(semSetId,1,SETVAL,su) < 0){
-
perror("semctl failed");
-
exit(1);
-
}
-
su.val = 1;//为1时可以进入缓冲区
-
if(semctl(semSetId,2,SETVAL,su) < 0){
-
perror("semctl failed");
-
exit(1);
-
}
-
}
-
int main()
-
{
-
int i = 0,child = -1;
-
init();
-
//创建 多个(N_CONSUMER)消费者子进程
-
for(i = 0; i < N_CONSUMER; i++)
-
{
-
if((child = fork()) < 0)//调用fork失败
-
{
-
perror("the fork failed");
-
exit(1);
-
}
-
else if(child == 0)//子进程
-
{
-
printf("我是第 %d 个消费者子进程,PID = %d\n",i,getpid());
-
while(1)
-
{
-
waitSem(semSetId,1);//必须有产品才能消费
-
waitSem(semSetId,2);//锁定缓冲区
-
consume();//获得产品,需要修改缓冲区
-
sigSem(semSetId,2);//释放缓冲区
-
sigSem(semSetId,0);//告知生产者,有空间了
-
sleep(2);//消费频率
-
}
-
break;//务必有
-
}
-
}
-
-
-
//父进程开始生产
-
if(child > 0)
-
{
-
while(1)
-
{
-
waitSem(semSetId,0);//获取一个空间用于存放产品
-
waitSem(semSetId,2);//占有产品缓冲区
-
produce();
-
sigSem(semSetId,2);//释放产品缓冲区
-
sleep(1);//每两秒生产一个
-
sigSem(semSetId,1);//告知消费者有产品了
-
}
-
}
-
return 0;
-
}
互斥量实际上相当于二元信号量,它是纯天然适合生产者消费者问题的解决方案,使用互斥量可以很好地描述生产者或者消费者独占缓冲区的特点。
不过互斥量的能力也仅此而已,如果需要在使用线程方案时提供更复杂的逻辑,则需要配合使用条件变量。生产者要求在缓冲区不满的情况下才能生产,我用 notFull 条件变量表示这种情况;消费者要求在缓冲区不空的情况下才能消费,我用 notEmpty 条件描述这种情况。
条件变量与互斥量配合以实现线程的同步,通常是以下结构:
-
pthread_mutex_wait(&mtx);
-
while(condition not fullfilled){
-
pthread_cond_wait(&cond,&mtx);
-
}
-
//在此处修改缓冲区
-
pthread_mutex_signal(&mtx);
其中,cond 是 pthread_cond_t 类型的对象,mtx 是 pthread_mutex_t 类型的对象。pthread_cond_wait 在不同条件下行为不同:
1. 当执行 pthread_cond_wait 时,作为一个原子操作包含以下两步:
1) 解锁互斥量 mtx
2) 阻塞进程直到其它线程调用 pthread_cond_signal 以告知 cond 可以不阻塞
2. 当执行 pthread_cond_signal(&cond) 时,作为原子操作包含以下两步:
1) 给 mtx 加锁
2)停止阻塞线程, 因而得以再次执行循环,判断条件是否满足。(注意到此时 mtx 仍然被当前线程独有,保证互斥)
-
#include "stdio.h"
-
#include <stdlib.h>
-
#include <pthread.h>
-
-
-
#define N_CONSUMER 3 //消费者数量
-
#define N_PRODUCER 2 //生产者数量
-
#define C_SLEEP 1 //控制 consumer 消费的节奏
-
#define P_SLEEP 1 //控制 producer 生产的节奏
-
-
pthread_t ctid[N_CONSUMER];//consumer thread id
-
pthread_t ptid[N_PRODUCER];//producer thread id
-
-
pthread_cond_t notFull,notEmpty;//缓冲区不满;缓冲区不空
-
pthread_mutex_t buf = PTHREAD_MUTEX_INITIALIZER;//用于锁住缓冲区
-
-
//从 begin 到 end(不含end) 代表产品
-
//cnt 代表产品数量
-
//max 代表库房的容量,即最多生产多少产品
-
int begin = 0,end = 0, cnt = 0, max = 4;
-
-
void * consumer(void * pidx)//consumer thread idx
-
{
-
printf("consumer thread id %d\n",*((int *)pidx));
-
while(1)
-
{
-
pthread_mutex_lock(&buf);
-
while(cnt == 0){//当缓冲区空时
-
pthread_cond_wait(?Empty,&buf);
-
}
-
printf("consume %d\n",begin);
-
begin = (begin+1)%max;
-
cnt--;
-
pthread_mutex_unlock(&buf);
-
sleep(C_SLEEP);
-
pthread_cond_signal(?Full);
-
}
-
pthread_exit((void *)0);
-
}
-
void * producer(void * pidx)//producer thread idx
-
{
-
printf("producer thread id %d\n",*((int *)pidx));
-
while(1)
-
{
-
pthread_mutex_lock(&buf);
-
while(cnt == max){//当缓冲区满时
-
pthread_cond_wait(?Full,&buf);
-
}
-
printf("produce %d\n",end);
-
end = (end+1)%max;
-
cnt++;
-
pthread_mutex_unlock(&buf);
-
sleep(P_SLEEP);
-
pthread_cond_signal(?Empty);
-
}
-
pthread_exit((void *)0);
-
}
-
-
int main()
-
{
-
int i = 0;
-
for(i = 0; i < N_CONSUMER; i++)
-
{
-
;int * j = (int *) malloc(sizeof(int));
-
*j = i;
-
if(pthread_create(&ctid[i],NULL,consumer,j) != 0)
-
{
-
perror("create consumer failed\n");
-
exit(1);
-
}
-
}
-
for(i = 0; i < N_PRODUCER; i++)
-
{
-
int * j = (int *) malloc(sizeof(int));
-
*j = i;
-
if(pthread_create(&ptid[i],NULL,producer,j) != 0)
-
{
-
perror("create producer failed\n");
-
exit(1);
-
}
-
}
-
while(1)
-
{
-
sleep(10);
-
}
-
return 0;
-
}
阅读(5167) | 评论(0) | 转发(0) |