Chinaunix首页 | 论坛 | 博客
  • 博客访问: 1276736
  • 博文数量: 247
  • 博客积分: 5587
  • 博客等级: 大校
  • 技术积分: 2060
  • 用 户 组: 普通用户
  • 注册时间: 2010-02-24 13:27
文章分类
文章存档

2012年(101)

2011年(44)

2010年(102)

分类:

2010-11-07 22:50:30

线程池:简单地说,线程池 就是预先创建好一批线程,方便、快速地处理收到的业务。比起传统的到来一个任务,即时创建一个线程来处理,节省了线程的创建和回收的开销,响应更快,效率更高。

 

在linux中,使用的是posix线程库,首先介绍几个常用的函数:

1 线程的创建和取消函数

pthread_create

创建线程

pthread_join

合并线程

pthread_cancel

取消线程

2 线程同步函数

pthread_mutex_lock

pthread_mutex_unlock

pthread_cond_signal

pthread_cond_wait

 

关于函数的详细说明,参考man手册

 

线程池的实现:

线程池的实现主要分为三部分,线程的创建、添加任务到线程池中、工作线程从任务队列中取出任务进行处理。

主要有两个类来实现,CTask,CThreadPool

/**
执行任务的类,设置任务数据并执行
**/

C代码 
  1. class CTask  
  2. {  
  3. protected:  
  4.  string m_strTaskName;  //任务的名称  
  5.  void* m_ptrData;       //要执行的任务的具体数据  
  6. public:  
  7.  CTask(){}  
  8.  CTask(string taskName)  
  9.  {  
  10.   this->m_strTaskName = taskName;  
  11.   m_ptrData = NULL;  
  12.  }  
  13.  virtual int Run()= 0;  
  14.  void SetData(void* data);    //设置任务数据  
  15. };  

 

任务类是个虚类,所有的任务要从CTask类中继承 ,实现run接口,run接口中需要实现的就是具体解析任务的逻辑。m_ptrData是指向任务数据的指针,可以是简单数据类型,也可以是自定义的复杂数据类型。

 

线程池类

/**
线程池
**/

Java代码 
  1. class CThreadPool  
  2. {  
  3. private:  
  4.  vector m_vecTaskList;         //任务列表  
  5.  int m_iThreadNum;                            //线程池中启动的线程数             
  6.  static vector m_vecIdleThread;   //当前空闲的线程集合  
  7.  static vector m_vecBusyThread;   //当前正在执行的线程集合  
  8.  static pthread_mutex_t m_pthreadMutex;    //线程同步锁  
  9.  static pthread_cond_t m_pthreadCond;    //线程同步的条件变量  
  10. protected:  
  11.  static void* ThreadFunc(void * threadData); //新线程的线程函数  
  12.  static int MoveToIdle(pthread_t tid);   //线程执行结束后,把自己放入到空闲线程中  
  13.  static int MoveToBusy(pthread_t tid);   //移入到忙碌线程中去  
  14.  int Create();          //创建所有的线程  
  15. public:  
  16.  CThreadPool(int threadNum);  
  17.  int AddTask(CTask *task);      //把任务添加到线程池中  
  18.  int StopAll();  
  19. };  

 

当线程池对象创建后,启动一批线程,并把所有的线程放入空闲列表中,当有任务到达时,某一个线程取出任务并进行处理。

线程之间的同步用线程锁和条件变量。

这个类的对外接口有两个:

AddTask函数把任务添加到线程池的任务列表中,并通知线程进行处理。当任务到到时,把任务放入m_vecTaskList任务列表中,并用pthread_cond_signal唤醒一个线程进行处理。

StopAll函数停止所有的线程

 

Cpp代码 
  1. ************************************************  
  2.   
  3. 代码:  
  4.   
  5. ××××××××××××××××××××CThread.h  
  6.   
  7.    
  8.   
  9. #ifndef __CTHREAD  
  10. #define __CTHREAD  
  11. #include   
  12. #include   
  13. #include   
  14.   
  15. using namespace std;  
  16.   
  17. /** 
  18. 执行任务的类,设置任务数据并执行 
  19. **/  
  20. class CTask  
  21. {  
  22. protected:  
  23.  string m_strTaskName;  //任务的名称  
  24.  void* m_ptrData;       //要执行的任务的具体数据  
  25. public:  
  26.  CTask(){}  
  27.  CTask(string taskName)  
  28.  {  
  29.   this->m_strTaskName = taskName;  
  30.   m_ptrData = NULL;  
  31.  }  
  32.  virtual int Run()= 0;  
  33.  void SetData(void* data);    //设置任务数据  
  34. };  
  35.   
  36. /** 
  37. 线程池 
  38. **/  
  39. class CThreadPool  
  40. {  
  41. private:  
  42.  vector m_vecTaskList;         //任务列表  
  43.  int m_iThreadNum;                            //线程池中启动的线程数             
  44.  static vector m_vecIdleThread;   //当前空闲的线程集合  
  45.  static vector m_vecBusyThread;   //当前正在执行的线程集合  
  46.  static pthread_mutex_t m_pthreadMutex;    //线程同步锁  
  47.  static pthread_cond_t m_pthreadCond;    //线程同步的条件变量  
  48. protected:  
  49.  static void* ThreadFunc(void * threadData); //新线程的线程函数  
  50.  static int MoveToIdle(pthread_t tid);   //线程执行结束后,把自己放入到空闲线程中  
  51.  static int MoveToBusy(pthread_t tid);   //移入到忙碌线程中去  
  52.  int Create();          //创建所有的线程  
  53. public:  
  54.  CThreadPool(int threadNum);  
  55.  int AddTask(CTask *task);      //把任务添加到线程池中  
  56.  int StopAll();  
  57. };  
  58.   
  59. #endif  
  60.   
  61.    
  62.   
  63.    
  64.   
  65.    
  66.   
  67. 类的实现为:  
  68.   
  69. ××××××××××××××××××××CThread.cpp  
  70.   
  71.    
  72.   
  73. #include "CThread.h"  
  74. #include   
  75. #include   
  76.   
  77. using namespace std;  
  78.   
  79. void CTask::SetData(void * data)  
  80. {  
  81.  m_ptrData = data;  
  82. }  
  83.   
  84. vector CThreadPool::m_vecBusyThread;  
  85. vector CThreadPool::m_vecIdleThread;  
  86. pthread_mutex_t CThreadPool::m_pthreadMutex = PTHREAD_MUTEX_INITIALIZER;  
  87. pthread_cond_t CThreadPool::m_pthreadCond = PTHREAD_COND_INITIALIZER;  
  88.   
  89. CThreadPool::CThreadPool(int threadNum)  
  90. {  
  91.  this->m_iThreadNum = threadNum;  
  92.  Create();  
  93. }  
  94. int CThreadPool::MoveToIdle(pthread_t tid)  
  95. {  
  96.  vector::iterator busyIter = m_vecBusyThread.begin();  
  97.  while(busyIter != m_vecBusyThread.end())  
  98.  {  
  99.   if(tid == *busyIter)  
  100.   {  
  101.    break;  
  102.   }  
  103.   busyIter++;  
  104.  }  
  105.  m_vecBusyThread.erase(busyIter);  
  106.  m_vecIdleThread.push_back(tid);  
  107.  return 0;  
  108. }  
  109.   
  110. int CThreadPool::MoveToBusy(pthread_t tid)  
  111. {  
  112.  vector::iterator idleIter = m_vecIdleThread.begin();  
  113.  while(idleIter != m_vecIdleThread.end())  
  114.  {  
  115.   if(tid == *idleIter)  
  116.   {  
  117.    break;  
  118.   }  
  119.   idleIter++;  
  120.  }  
  121.  m_vecIdleThread.erase(idleIter);  
  122.  m_vecBusyThread.push_back(tid);  
  123.  return 0;  
  124. }  
  125. void* CThreadPool::ThreadFunc(void * threadData)  
  126. {  
  127.  pthread_t tid = pthread_self();  
  128.  while(1)  
  129.  {  
  130.   pthread_mutex_lock(&m_pthreadMutex);  
  131.   pthread_cond_wait(&m_pthreadCond,&m_pthreadMutex);  
  132.   cout << "tid:" << tid << " run" << endl;  
  133.   //get task  
  134.   vector* taskList = (vector*)threadData;  
  135.   vector::iterator iter = taskList->begin();  
  136.   while(iter != taskList->end())  
  137.   {  
  138.      
  139.    MoveToBusy(tid);  
  140.    break;  
  141.   }  
  142.   CTask* task = *iter;  
  143.   taskList->erase(iter);  
  144.   pthread_mutex_unlock(&m_pthreadMutex);  
  145.   cout << "idel thread number:" << CThreadPool::m_vecIdleThread.size() << endl;  
  146.   cout << "busy thread number:" << CThreadPool::m_vecBusyThread.size() << endl;  
  147.   //cout << "task to be run:" << taskList->size() << endl;  
  148.   task->Run();  
  149.     
  150.   //cout << "CThread::thread work" << endl;  
  151.   cout << "tid:" << tid << " idle" << endl;  
  152.     
  153.  }  
  154.  return (void*)0;  
  155. }  
  156.   
  157. int CThreadPool::AddTask(CTask *task)  
  158. {  
  159.  this->m_vecTaskList.push_back(task);  
  160.  pthread_cond_signal(&m_pthreadCond);  
  161.  return 0;  
  162. }  
  163. int CThreadPool::Create()  
  164. {  
  165.  for(int i = 0; i < m_iThreadNum;i++)  
  166.  {  
  167.   pthread_t tid = 0;  
  168.   pthread_create(&tid,NULL,ThreadFunc,&m_vecTaskList);  
  169.   m_vecIdleThread.push_back(tid);  
  170.  }  
  171.  return 0;  
  172. }  
  173.   
  174. int CThreadPool::StopAll()  
  175. {  
  176.  vector::iterator iter = m_vecIdleThread.begin();  
  177.  while(iter != m_vecIdleThread.end())  
  178.  {  
  179.   pthread_cancel(*iter);  
  180.   pthread_join(*iter,NULL);  
  181.   iter++;  
  182.  }  
  183.   
  184.  iter = m_vecBusyThread.begin();  
  185.  while(iter != m_vecBusyThread.end())  
  186.  {  
  187.   pthread_cancel(*iter);  
  188.   pthread_join(*iter,NULL);  
  189.   iter++;  
  190.  }  
  191.    
  192.  return 0;  
  193. }  
  194.   
  195. 简单示例:  
  196.   
  197. ××××××××test.cpp  
  198.   
  199. #include "CThread.h"  
  200. #include   
  201.   
  202. using namespace std;  
  203.   
  204. class CWorkTask: public CTask  
  205. {  
  206. public:  
  207.  CWorkTask()  
  208.  {}  
  209.  int Run()  
  210.  {  
  211.   cout << (char*)this->m_ptrData << endl;  
  212.   sleep(10);  
  213.   return 0;  
  214.  }  
  215. };  
  216. int main()  
  217. {  
  218.  CWorkTask taskObj;  
  219.  char szTmp[] = "this is the first thread running,haha success";  
  220.  taskObj.SetData((void*)szTmp);  
  221.  CThreadPool threadPool(10);  
  222.  for(int i = 0;i < 11;i++)  
  223.  {  
  224.   threadPool.AddTask(&taskObj);  
  225.  }  
  226.  while(1)  
  227.  {  
  228.   sleep(120);  
  229.  }  
  230.  return 0;  
  231. }  
阅读(622) | 评论(0) | 转发(0) |
给主人留下些什么吧!~~