Chinaunix首页 | 论坛 | 博客
  • 博客访问: 1456704
  • 博文数量: 218
  • 博客积分: 6394
  • 博客等级: 准将
  • 技术积分: 2563
  • 用 户 组: 普通用户
  • 注册时间: 2008-02-08 15:33
个人简介

持之以恒

文章分类

全部博文(218)

文章存档

2013年(8)

2012年(2)

2011年(21)

2010年(55)

2009年(116)

2008年(16)

分类: LINUX

2013-03-13 16:17:59

参考:
OS: 生产者消费者问题(多进程+共享内存+信号量)
http://blog.csdn.net/yaozhiyi/article/details/7561759

OS: 生产者消费者问题(多线程+互斥量+条件变量)
http://blog.csdn.net/yaozhiyi/article/details/7563050

生产者和消费者的问题:
生产者和消费者问题是多个相互合作的进程之间的一种抽象。生产者和消费者之间的关系:
1.  对缓冲区的访问是互斥的。由于两者都会修改缓冲区,因此,一方修改缓冲区时,另一方不能修改,这就是互斥。
2.  一方的行为影响另一方。缓冲区不空,才能消费,何时不空?生产了就不空;缓冲区满,就不能生产,何时不满?消费了就不满。这是同步关系。
为了描述这种关系,一方面,使用共享内存代表缓冲区;另一方面,使用 互斥信号量 控制对缓冲区的访问,使用同步信号量描述两者的依赖关系。
共享内存
共享存储的原理是将进程的地址空间映射到一个共享存储段
shmget(得到一个共享内存标识符或创建一个共享内存对象)
shmat(把共享内存区对象映射到调用进程的地址空间)
shmdt(断开共享内存连接)
shmctl(共享内存管理)
参考:

// IPC_PRIVATE 表示创建信号量集, NUM_OF_SEM表示该集合中有多少信号量; FLAGS复杂不追究
semget(IPC_PRIVATE, NUM_OF_SEM, FLAGS );
// SEM_KEY 是 key_t 类型
//如果 SEM_KEY 代表的信号量集存在,则返回信号量集的ID
//如果不存在,则创建信号量集并返回ID
semget(SEM_KEY, NUM_OF_SEM,FLAGS);

2. 初始化信号量

创建的过程并未指定信号量的初始值,需要使用 semctl 函数指定。
semctl(int semSetId , int semIdx , int cmd, union semun su);

其中 semSetId 是指信号量集的 ID , semIdx 指信号量集中某个信号量的索引(从零开始), 如果是要设置信号量的值, 填 SETVAL 即可, 为了设置信号量的值,可以指定su.val为索要设置的值。

解释都在注释里面!

点击(此处)折叠或打开

  1. #include <stdio.h>
  2. #include <sys/shm.h>
  3. #include <sys/sem.h>
  4. #include <stdlib.h>
  5. #include <unistd.h>
  6. #include <sys/types.h>
  7. #define SHM_SIZE (1024*1024)
  8. #define SHM_MODE 0600
  9. #define SEM_MODE 0600

  10. #if defined(__GNU_LIBRARY__) && !defined(_SEM_SEMUN_UNDEFINED)
  11. /* union semun is defined by including <sys/sem.h> */
  12. #else
  13. /* according to X/OPEN we have to define it ourselves */
  14. union semun{
  15.     int val;
  16.     struct semid_ds *buf;
  17.     unsigned short *array;
  18. };
  19. #endif

  20. struct ShM{
  21.     int start;
  22.     int end;
  23. }* pSM;

  24. const int N_CONSUMER = 3;//消费者数量
  25. const int N_BUFFER = 5;//缓冲区容量
  26. int shmId = -1,semSetId=-1;
  27. union semun su;//sem union,用于初始化信号量

  28. //semSetId 表示信号量集合的 id
  29. //semNum 表示要处理的信号量在信号量集合中的索引
  30. void waitSem(int semSetId,int semNum)
  31. {
  32.     struct sembuf sb;
  33.     sb.sem_num = semNum;
  34.     sb.sem_op = -1;//表示要把信号量减一
  35.     sb.sem_flg = SEM_UNDO;//
  36.     //第二个参数是 sembuf [] 类型的,表示数组
  37.     //第三个参数表示 第二个参数代表的数组的大小
  38.     if(semop(semSetId,&sb,1) < 0){
  39.         perror("waitSem failed");
  40.         exit(1);
  41.     }
  42. }
  43. void sigSem(int semSetId,int semNum)
  44. {
  45.     struct sembuf sb;
  46.     sb.sem_num = semNum;
  47.     sb.sem_op = 1;
  48.     sb.sem_flg = SEM_UNDO;
  49.     //第二个参数是 sembuf [] 类型的,表示数组
  50.     //第三个参数表示 第二个参数代表的数组的大小
  51.     if(semop(semSetId,&sb,1) < 0){
  52.         perror("waitSem failed");
  53.         exit(1);
  54.     }
  55. }
  56. //必须在保证互斥以及缓冲区不满的情况下调用
  57. void produce()
  58. {
  59.     int last = pSM->end;
  60.     pSM->end = (pSM->end+1) % N_BUFFER;
  61.     printf("生产 %d\n",last);
  62. }
  63. //必须在保证互斥以及缓冲区不空的情况下调用
  64. void consume()
  65. {
  66.     int last = pSM->start;
  67.     pSM->start = (pSM->start + 1)%N_BUFFER;
  68.     printf("消耗 %d\n",last);
  69. }

  70. void init()
  71. {
  72.     //缓冲区分配以及初始化
  73.     if((shmId = shmget(IPC_PRIVATE,SHM_SIZE,SHM_MODE)) < 0)
  74.     {
  75.         perror("create shared memory failed");
  76.         exit(1);
  77.     }
  78.     pSM = (struct ShM *)shmat(shmId,0,0);
  79.     pSM->start = 0;
  80.     pSM->end = 0;
  81.     
  82.     //信号量创建
  83.     //第一个:同步信号量,表示先后顺序,必须有空间才能生产
  84.     //第二个:同步信号量,表示先后顺序,必须有产品才能消费
  85.     //第三个:互斥信号量,生产者和每个消费者不能同时进入缓冲区

  86.     if((semSetId = semget(IPC_PRIVATE,3,SEM_MODE)) < 0)
  87.     {
  88.         perror("create semaphore failed");
  89.         exit(1);
  90.     }
  91.     //信号量初始化,其中 su 表示 union semun
  92.     su.val = N_BUFFER;//当前库房还可以接收多少产品
  93.     if(semctl(semSetId,0,SETVAL, su) < 0){
  94.         perror("semctl failed");
  95.         exit(1);
  96.     }
  97.     su.val = 0;//当前没有产品
  98.     if(semctl(semSetId,1,SETVAL,su) < 0){
  99.         perror("semctl failed");
  100.         exit(1);
  101.     }
  102.     su.val = 1;//为1时可以进入缓冲区
  103.     if(semctl(semSetId,2,SETVAL,su) < 0){
  104.         perror("semctl failed");
  105.         exit(1);
  106.     }
  107. }
  108. int main()
  109. {
  110.     int i = 0,child = -1;
  111.     init();
  112.     //创建 多个(N_CONSUMER)消费者子进程
  113.     for(i = 0; i < N_CONSUMER; i++)
  114.     {
  115.         if((child = fork()) < 0)//调用fork失败
  116.         {
  117.             perror("the fork failed");
  118.             exit(1);
  119.         }
  120.         else if(child == 0)//子进程
  121.         {
  122.             printf("我是第 %d 个消费者子进程,PID = %d\n",i,getpid());
  123.             while(1)
  124.             {
  125.                 waitSem(semSetId,1);//必须有产品才能消费
  126.                 waitSem(semSetId,2);//锁定缓冲区
  127.                 consume();//获得产品,需要修改缓冲区
  128.                 sigSem(semSetId,2);//释放缓冲区
  129.                 sigSem(semSetId,0);//告知生产者,有空间了
  130.                 sleep(2);//消费频率
  131.             }
  132.             break;//务必有
  133.         }
  134.     }
  135.     
  136.     
  137.     //父进程开始生产
  138.     if(child > 0)
  139.     {
  140.         while(1)
  141.         {
  142.             waitSem(semSetId,0);//获取一个空间用于存放产品
  143.             waitSem(semSetId,2);//占有产品缓冲区
  144.             produce();
  145.             sigSem(semSetId,2);//释放产品缓冲区
  146.             sleep(1);//每两秒生产一个
  147.             sigSem(semSetId,1);//告知消费者有产品了
  148.         }
  149.     }
  150.     return 0;
  151. }


互斥量实际上相当于二元信号量,它是纯天然适合生产者消费者问题的解决方案,使用互斥量可以很好地描述生产者或者消费者独占缓冲区的特点。
不过互斥量的能力也仅此而已,如果需要在使用线程方案时提供更复杂的逻辑,则需要配合使用条件变量。生产者要求在缓冲区不满的情况下才能生产,我用 notFull 条件变量表示这种情况;消费者要求在缓冲区不空的情况下才能消费,我用 notEmpty 条件描述这种情况。
条件变量与互斥量配合以实现线程的同步,通常是以下结构:


点击(此处)折叠或打开

  1. pthread_mutex_wait(&mtx);
  2. while(condition not fullfilled){
  3.     pthread_cond_wait(&cond,&mtx);
  4. }
  5. //在此处修改缓冲区
  6. pthread_mutex_signal(&mtx);

其中,cond 是 pthread_cond_t 类型的对象,mtx 是 pthread_mutex_t 类型的对象。pthread_cond_wait 在不同条件下行为不同:
1. 当执行 pthread_cond_wait 时,作为一个原子操作包含以下两步:
1) 解锁互斥量 mtx
2) 阻塞进程直到其它线程调用 pthread_cond_signal 以告知 cond 可以不阻塞

2. 当执行 pthread_cond_signal(&cond) 时,作为原子操作包含以下两步:
1) 给 mtx 加锁
2)停止阻塞线程, 因而得以再次执行循环,判断条件是否满足。(注意到此时 mtx 仍然被当前线程独有,保证互斥)


点击(此处)折叠或打开

  1. #include "stdio.h"
  2. #include <stdlib.h>
  3. #include <pthread.h>


  4. #define N_CONSUMER 3 //消费者数量
  5. #define N_PRODUCER 2 //生产者数量
  6. #define C_SLEEP 1 //控制 consumer 消费的节奏
  7. #define P_SLEEP 1 //控制 producer 生产的节奏

  8. pthread_t ctid[N_CONSUMER];//consumer thread id
  9. pthread_t ptid[N_PRODUCER];//producer thread id

  10. pthread_cond_t notFull,notEmpty;//缓冲区不满;缓冲区不空
  11. pthread_mutex_t buf = PTHREAD_MUTEX_INITIALIZER;//用于锁住缓冲区

  12. //从 begin 到 end(不含end) 代表产品
  13. //cnt 代表产品数量
  14. //max 代表库房的容量,即最多生产多少产品
  15. int begin = 0,end = 0, cnt = 0, max = 4;

  16. void * consumer(void * pidx)//consumer thread idx
  17. {
  18.     printf("consumer thread id %d\n",*((int *)pidx));
  19.     while(1)
  20.     {
  21.         pthread_mutex_lock(&buf);
  22.         while(cnt == 0){//当缓冲区空时
  23.             pthread_cond_wait(?Empty,&buf);
  24.         }
  25.         printf("consume %d\n",begin);
  26.         begin = (begin+1)%max;
  27.         cnt--;
  28.         pthread_mutex_unlock(&buf);
  29.         sleep(C_SLEEP);
  30.         pthread_cond_signal(?Full);
  31.     }
  32.     pthread_exit((void *)0);
  33. }
  34. void * producer(void * pidx)//producer thread idx
  35. {
  36.     printf("producer thread id %d\n",*((int *)pidx));
  37.     while(1)
  38.     {
  39.         pthread_mutex_lock(&buf);
  40.         while(cnt == max){//当缓冲区满时
  41.             pthread_cond_wait(?Full,&buf);
  42.         }
  43.         printf("produce %d\n",end);
  44.         end = (end+1)%max;
  45.         cnt++;
  46.         pthread_mutex_unlock(&buf);
  47.         sleep(P_SLEEP);
  48.         pthread_cond_signal(?Empty);
  49.     }
  50.     pthread_exit((void *)0);
  51. }

  52. int main()
  53. {
  54.     int i = 0;
  55.     for(i = 0; i < N_CONSUMER; i++)
  56.     {
  57.         ;int * j = (int *) malloc(sizeof(int));
  58.         *j = i;
  59.         if(pthread_create(&ctid[i],NULL,consumer,j) != 0)
  60.         {
  61.             perror("create consumer failed\n");
  62.             exit(1);
  63.         }
  64.     }
  65.     for(i = 0; i < N_PRODUCER; i++)
  66.     {
  67.         int * j = (int *) malloc(sizeof(int));
  68.         *j = i;
  69.         if(pthread_create(&ptid[i],NULL,producer,j) != 0)
  70.         {
  71.             perror("create producer failed\n");
  72.             exit(1);
  73.         }
  74.     }
  75.     while(1)
  76.     {
  77.         sleep(10);
  78.     }
  79.     return 0;
  80. }





阅读(5056) | 评论(0) | 转发(0) |
给主人留下些什么吧!~~