多生产者,多消费者
-
#include "../unipc.h"
-
-
#define NBUFF 10
-
#define MAX_PRODUCE 100
-
#define MAX_CONSUME 100
-
#define FILE_MODE (S_IRUSR|S_IWUSR|S_IRGRP|S_IROTH)
-
int nitems;
-
int nproducers;
-
int nconsumers;
-
struct {
-
int consume_index;
-
int index;
-
int buff[NBUFF];
-
sem_t mutex,nempty,nstored;
-
} shared;
-
-
void *produce(void *arg)
-
{
-
for(;;) {
-
sem_wait(&shared.nempty);
-
sem_wait(&shared.mutex);
-
if(shared.index >= nitems){
-
sem_post(&shared.nstored);
-
sem_post(&shared.nempty);
-
sem_post(&shared.mutex);
-
return NULL;
-
}
-
shared.buff[shared.index %NBUFF] = shared.index;
-
shared.index++;
-
sem_post(&shared.mutex);
-
sem_post(&shared.nstored);
-
(*(int*)arg)++;
-
}
-
}
-
-
void *consume(void *arg)
-
{
-
for(;;) {
-
sem_wait(&shared.nstored);
-
sem_wait(&shared.mutex);
-
-
if(shared.consume_index >= nitems) {
-
sem_post(&shared.nstored);
-
sem_post(&shared.mutex);
-
return NULL;
-
}
-
-
-
if(shared.consume_index != shared.buff[shared.consume_index%NBUFF]) {
-
printf("consume_index = %d,buff[%d]=%d\n",shared.consume_index,shared.consume_index,shared.buff[shared.consum
-
e_index]);
-
}
-
shared.consume_index++;
-
-
(*((int*)arg))++;
-
-
/*if(shared.consume_index >= nitems) {
-
sem_post(&shared.nstored);
-
sem_post(&shared.mutex);
-
return NULL;
-
}*/
-
sem_post(&shared.mutex);
-
sem_post(&shared.nempty);
-
}
-
}
-
-
int main(int argc ,char *argv[])
-
{
-
int i;
-
int count[MAX_PRODUCE];
-
int consume_count[MAX_CONSUME];
-
pthread_t ptid_produce[MAX_PRODUCE],ptid_consume[MAX_CONSUME];
-
-
if(argc != 4) {
-
printf("usage produce3 <#produces> <#consume> <#items>\n");
-
return -1;
-
}
-
nproducers = atoi(argv[1]);
-
nconsumers = atoi(argv[2]);
-
nitems = atoi(argv[3]);
-
if(nproducers > MAX_PRODUCE)
-
nproducers = MAX_PRODUCE;
-
if(nconsumers > MAX_CONSUME)
-
nconsumers = MAX_CONSUME;
-
printf("nproducers = %d,nitems=%d\n",nproducers,nitems);
-
sem_init(&shared.mutex,0,1);
-
sem_init(&shared.nempty,0,NBUFF);
-
sem_init(&shared.nstored,0,0);
-
shared.index = 0;
-
shared.consume_index = 0;
-
for(i = 0;i < nproducers; i++) {
-
count[i] = 0;
-
pthread_create(&ptid_produce[i],NULL,produce,&count[i]);
-
}
-
for(i = 0;i < nconsumers; i++) {
-
consume_count[i] = 0;
-
pthread_create(&ptid_consume[i],NULL,consume,&consume_count[i]);
-
}
-
-
for(i = 0;i < nproducers; i++) {
-
pthread_join(ptid_produce[i],NULL);
-
printf("count[%d] = %d\n",i,count[i]);
-
}
-
for(i = 0;i < nconsumers; i++) {
-
pthread_join(ptid_consume[i],NULL);
-
printf("consume_count[%d] = %d\n",i,consume_count[i]);
-
}
-
-
sem_destroy(&shared.mutex);
-
sem_destroy(&shared.nempty);
-
sem_destroy(&shared.nstored);
-
return 0;
-
}
此处主要分析一下消费者的终止
与生产者终止相同,消费者线程在消费完所有数据后,所有消费者线程都阻塞在sem_wait(&sem_nstored)上,此时必须要有sem_post(&sem_nstored)来触发进入if判断条件并且终止进程,如下代码:
-
if(shared.consume_index >= nitems) {
-
sem_post(&shared.nstored);-----------这个post同样是为了防止消费者线程数多于NBUFF数后导致多于的线程阻塞
-
sem_post(&shared.mutex);
-
return NULL;
-
}
这个阻塞和if判断条件跟生产者中的很像,但是却有区别,主要体现在,
1.生产者阻塞后,可以有消费者消费完了后把sem_nempty值往上加,这样生产者线程就可以进入判断条件,并且满足判断条件而退出。
2.但是消费者阻塞后,该怎么破除这个阻塞进入if判断条件从而终止线程呢?
答案是在每个生产者线程结束之前sem_post(&shared.nstored).
-
if(shared.consume_index >= nitems) {
-
sem_post(&shared.nstored);
-
sem_post(&shared.mutex);
-
return NULL;
-
}
阅读(1167) | 评论(0) | 转发(0) |