Chinaunix首页 | 论坛 | 博客
  • 博客访问: 403580
  • 博文数量: 87
  • 博客积分: 2571
  • 博客等级: 少校
  • 技术积分: 920
  • 用 户 组: 普通用户
  • 注册时间: 2009-12-29 13:10
文章分类

全部博文(87)

文章存档

2012年(49)

2011年(7)

2010年(26)

2009年(5)

分类: C/C++

2012-05-11 14:50:29


  1. #include <stdio.h>
  2. #include <stdlib.h>
  3. #include <unistd.h>
  4. #include <pthread.h>

  5. #define QUEUE_SIZE 8 //环形队列大小
  6. #define GET_POS(x) ((x+1)%QUEUE_SIZE)
  7. #define GET_SIZE(x,y) (x-y)

  8. #ifndef __CBASEDATA_H__
  9. #define __CBASEDATA_H__

  10. class CBaseData
  11. {
  12. public:
  13.     CBaseData(const int& data = 0);
  14.     void set(const int& data);
  15.     int get();    
  16.     int m_data;
  17. };

  18. CBaseData::CBaseData(const int& data)
  19. {
  20.     m_data = data;
  21. }

  22. void CBaseData::set(const int& data)
  23. {
  24.     m_data = data;
  25.     printf("_set %d\n",m_data);
  26. }

  27. int CBaseData::get()
  28. {
  29.     printf("_get %d\n",m_data);
  30.     return m_data;
  31. }

  32. #endif

  33. #ifndef __CPV_H__
  34. #define __CPV_H__
  
  //P-V模型,使用环形队列保存
  1. class CPV
  2. {
  3. public:
  4.     static CPV* getInstance();
  5.     int put(CBaseData& data);
  6.     int get(CBaseData& data);

  7. protected:
  8.     CPV();
  9.     ~CPV();
  10.     void _clean();
  11.     
  12. private:
  13.     CBaseData m_pBaseData[QUEUE_SIZE];//将要保存的是实例,而不是实例的指针(这里不太好,但对指针的操作有点问题,暂时如此)
  14.     int nRead;
  15.     int nWrite;
  16.     pthread_mutex_t lock;
  17.     pthread_cond_t noempty;
  18.     pthread_cond_t nofull;    
  19. };

  20. CPV::CPV()
  21. {
  22.     nRead = nWrite = 0;
  23.     pthread_mutex_init(&lock, NULL);
  24.     pthread_cond_init(&noempty, NULL);
  25.     pthread_cond_init(&nofull, NULL);
  26. }

  27. CPV* CPV::getInstance()
  28. {
  29.     static CPV inst;
  30.     return &inst;
  31. }

  32. void CPV::_clean()
  33. {
  34. //nothing
  35. }

  36. CPV::~CPV()
  37. {
  38.     _clean();
  39.     pthread_mutex_destroy(&lock);
  40.     pthread_cond_destroy(&noempty);
  41.     pthread_cond_destroy(&nofull);
  42. }

  43. int CPV::put(CBaseData& data)
  44. {
  45.     pthread_mutex_lock(&lock);
  46.     if( (GET_POS(nWrite) == GET_POS(nRead)) && (nWrite > nRead) )
  47.     {
  48.         pthread_cond_wait(&nofull, &lock);
  49.     }
  50.     m_pBaseData[GET_POS(nWrite)] = data;
  51.     printf("nWrite=%d\n",nWrite);
  52.     nWrite++;
  53.     
  54.     pthread_cond_signal(&noempty);
  55.     pthread_mutex_unlock(&lock);
  56.     return 0;
  57. }

  58. int CPV::get(CBaseData& data)
  59. {
  60.     pthread_mutex_lock(&lock);
  61.     if( (GET_POS(nWrite) == GET_POS(nRead)) && (nWrite == nRead))
  62.     {
  63.         pthread_cond_wait(&noempty, &lock);
  64.     }    
  65.     printf("nRead=%d\n",nRead);
  66.     data = m_pBaseData[GET_POS(nRead)];
  67.     printf("nRead=%d\n",nRead);
  68.     nRead++;
  69.     
  70.     pthread_cond_signal(&nofull);
  71.     pthread_mutex_unlock(&lock);
  72.     return 0;    
  73. }

  74. #endif

  1. //test

 //要传递给线程的配置项
  1. struct thread_conf
  2. {
  3.         int max_jobs;//每个线程执行的任务数,达到该阀值后,线程将退出
  4. };
  5. typedef struct thread_conf thread_conf_t;
  1. static int thread_cnt = 2;//最大消费者线程数
  2. static int cur_thread_cnt = 0;//当前线程数

  3. void *thread_P(void *arg)
  4. {
  5.     CPV *cp = CPV::getInstance();
  6.     int i = 100;
  7.     CBaseData pbd;
  8.     while(1)
  9.     {    
  10.         printf("put %d\n",i);
  11.         pbd.set(i);
  12.         cp->put(pbd);
  13.         i++;
  14.         sleep(1);
  15.     }
  16. }

  17. void *thread_V(void *arg)
  18. {
  19.     CPV *cv = CPV::getInstance();
  20.     int cnt = 0;
  21.     thread_conf_t *tc = (thread_conf_t *)arg;
  22.     int max = tc->max_jobs;

  23.     pthread_t tid = pthread_self();
  24.     while(max == 0 || cnt<max)//max为0时,认为没有设置阀值
  25.     {
  26.         CBaseData pbd;
  27.         cv->get(pbd);
  28.         printf("<----thread %d, get %d\n",(int)tid,pbd.get());
  29.         cnt++;
  30.     }
  31.     cur_thread_cnt--;
  32. }

  33. //一个简单的线程池管理者
  34. int thread_manager()
  35. {
  36.     thread_conf_t tc;
  37.     tc.max_jobs = 3;

  38.     pthread_t t_p;
  39.     int ret = pthread_create(&t_p,NULL,thread_P, NULL);
  40.     while(1)
  41.     {
  42.         if( cur_thread_cnt < thread_cnt)
  43.         {
  44.             pthread_t t_v;
  45.             pthread_create(&t_v,NULL,thread_V,(void *)&tc);
  46.             cur_thread_cnt++;    
  47.         }
  48. sleep(1);//让cpu喘口气
  49.     }
  50.     return 0;
  51. }


  52. int main()
  53. {
  54.     thread_manager();
  55.     return 0;    
  56. }

阅读(3010) | 评论(0) | 转发(0) |
给主人留下些什么吧!~~