Chinaunix首页 | 论坛 | 博客
  • 博客访问: 535844
  • 博文数量: 142
  • 博客积分: 0
  • 博客等级: 民兵
  • 技术积分: 1452
  • 用 户 组: 普通用户
  • 注册时间: 2013-09-12 16:28
文章分类

全部博文(142)

文章存档

2016年(10)

2015年(60)

2014年(72)

我的朋友

分类: C/C++

2014-12-23 11:05:08

多生产者,多消费者

点击(此处)折叠或打开

  1. #include "../unipc.h"

  2. #define NBUFF 10
  3. #define MAX_PRODUCE 100
  4. #define MAX_CONSUME 100
  5. #define FILE_MODE (S_IRUSR|S_IWUSR|S_IRGRP|S_IROTH)
  6. int nitems;
  7. int nproducers;
  8. int nconsumers;
  9. struct {
  10.     int consume_index;
  11.     int index;
  12.     int buff[NBUFF];
  13.     sem_t mutex,nempty,nstored;
  14. } shared;

  15. void *produce(void *arg)
  16. {
  17.     for(;;) {
  18.         sem_wait(&shared.nempty);
  19.         sem_wait(&shared.mutex);
  20.         if(shared.index >= nitems){
  21.             sem_post(&shared.nstored);
  22.             sem_post(&shared.nempty);
  23.             sem_post(&shared.mutex);
  24.             return NULL;
  25.         }
  26.         shared.buff[shared.index %NBUFF] = shared.index;
  27.         shared.index++;
  28.         sem_post(&shared.mutex);
  29.         sem_post(&shared.nstored);
  30.         (*(int*)arg)++;
  31.     }
  32. }

  33. void *consume(void *arg)
  34. {
  35.     for(;;) {
  36.         sem_wait(&shared.nstored);
  37.         sem_wait(&shared.mutex);
  38.  
  39.         if(shared.consume_index >= nitems) {
  40.             sem_post(&shared.nstored);
  41.             sem_post(&shared.mutex);
  42.             return NULL;
  43.         }

  44.        
  45.         if(shared.consume_index != shared.buff[shared.consume_index%NBUFF]) {
  46.             printf("consume_index = %d,buff[%d]=%d\n",shared.consume_index,shared.consume_index,shared.buff[shared.consum
  47. e_index]);
  48.         }
  49.         shared.consume_index++;
  50.         
  51.         (*((int*)arg))++;
  52.         
  53.         /*if(shared.consume_index >= nitems) {
  54.             sem_post(&shared.nstored);
  55.             sem_post(&shared.mutex);
  56.             return NULL;
  57.         }*/
  58.         sem_post(&shared.mutex);
  59.         sem_post(&shared.nempty);
  60.     }
  61. }

  62. int main(int argc ,char *argv[])
  63. {
  64.     int i;
  65.     int count[MAX_PRODUCE];
  66.     int consume_count[MAX_CONSUME];
  67.     pthread_t ptid_produce[MAX_PRODUCE],ptid_consume[MAX_CONSUME];

  68.     if(argc != 4) {
  69.         printf("usage produce3 <#produces> <#consume> <#items>\n");
  70.         return -1;
  71.     }
  72.     nproducers = atoi(argv[1]);
  73.     nconsumers = atoi(argv[2]);
  74.     nitems = atoi(argv[3]);
  75.     if(nproducers > MAX_PRODUCE)
  76.         nproducers = MAX_PRODUCE;
  77.     if(nconsumers > MAX_CONSUME)
  78.         nconsumers = MAX_CONSUME;
  79.     printf("nproducers = %d,nitems=%d\n",nproducers,nitems);
  80.     sem_init(&shared.mutex,0,1);
  81.     sem_init(&shared.nempty,0,NBUFF);
  82.     sem_init(&shared.nstored,0,0);
  83.     shared.index = 0;
  84.     shared.consume_index = 0;
  85.     for(i = 0;i < nproducers; i++) {
  86.         count[i] = 0;
  87.         pthread_create(&ptid_produce[i],NULL,produce,&count[i]);
  88.     }
  89.     for(i = 0;i < nconsumers; i++) {
  90.         consume_count[i] = 0;
  91.         pthread_create(&ptid_consume[i],NULL,consume,&consume_count[i]);
  92.     }

  93.     for(i = 0;i < nproducers; i++) {
  94.         pthread_join(ptid_produce[i],NULL);
  95.         printf("count[%d] = %d\n",i,count[i]);
  96.     }
  97.     for(i = 0;i < nconsumers; i++) {
  98.         pthread_join(ptid_consume[i],NULL);
  99.         printf("consume_count[%d] = %d\n",i,consume_count[i]);
  100.     }

  101.     sem_destroy(&shared.mutex);
  102.     sem_destroy(&shared.nempty);
  103.     sem_destroy(&shared.nstored);
  104.     return 0;
  105. }
此处主要分析一下消费者的终止
与生产者终止相同,消费者线程在消费完所有数据后,所有消费者线程都阻塞在sem_wait(&sem_nstored)上,此时必须要有sem_post(&sem_nstored)来触发进入if判断条件并且终止进程,如下代码:

点击(此处)折叠或打开

  1. if(shared.consume_index >= nitems) {
  2.             sem_post(&shared.nstored);-----------这个post同样是为了防止消费者线程数多于NBUFF数后导致多于的线程阻塞
  3.             sem_post(&shared.mutex);
  4.             return NULL;
  5. }
这个阻塞和if判断条件跟生产者中的很像,但是却有区别,主要体现在,
1.生产者阻塞后,可以有消费者消费完了后把sem_nempty值往上加,这样生产者线程就可以进入判断条件,并且满足判断条件而退出。
2.但是消费者阻塞后,该怎么破除这个阻塞进入if判断条件从而终止线程呢? 
   答案是在每个生产者线程结束之前sem_post(&shared.nstored).

点击(此处)折叠或打开

  1. if(shared.consume_index >= nitems) {
  2.             sem_post(&shared.nstored);
  3.             sem_post(&shared.mutex);
  4.             return NULL;
  5. }


阅读(1121) | 评论(0) | 转发(0) |
给主人留下些什么吧!~~