Chinaunix首页 | 论坛 | 博客
  • 博客访问: 1885114
  • 博文数量: 496
  • 博客积分: 12043
  • 博客等级: 上将
  • 技术积分: 4778
  • 用 户 组: 普通用户
  • 注册时间: 2010-11-27 14:26
文章分类

全部博文(496)

文章存档

2014年(8)

2013年(4)

2012年(181)

2011年(303)

2010年(3)

分类: C/C++

2011-12-06 14:37:09

  1. // controlled_module_ex.hpp : controlled_module类的扩展
  2. // 增强线程之间消息通讯
  3. // 增加线程安全启动和安全关闭功能
  4. // 增加定时器功能

  5. #pragma once
  6. #include 
  7. #include 
  8. #include "controlled_module.hpp"

  9. struct _command
  10. {
  11.     typedef boost::shared_ptr<_command> CCmdPtr;
  12.     unsigned int nCmd;
  13.     boost::any anyParam;
  14. };
  15. struct _wait_command
  16. {       
  17.     boost::any par;
  18.     unsigned int command;
  19.     void * event;
  20.     boost::shared_ptr resp;
  21. };
  22. class controlled_module_ex;
  23. struct _notify
  24. {
  25.     controlled_module_ex * sender;
  26.     int id;
  27.     boost::any par;
  28. };
  29. #define BM_RESERVE 1000
  30. #define BM_RING_START BM_RESERVE+1
  31. #define BM_RING_STOP BM_RESERVE+2
  32. #define BM_RING_SETTIME  BM_RESERVE+3
  33. #define BM_RING_SETPARENT BM_RESERVE+4
  34. #define BM_RING_CYCLE BM_RESERVE+5
  35. #define BM_RING_PROCESS BM_RESERVE+6
  36. #define BM_RING_PROCESSEND BM_RESERVE+7
  37. #define BM_RING_PROCESSFAIL BM_RESERVE+8
  38. #define BM_TIMER    BM_RESERVE+9
  39. #define BM_COMMAND  BM_RESERVE+10
  40. #define BM_NOTIFY   BM_RESERVE+11

  41. #define BM_USER 9000
  42. class controlled_timer;
  43. class controlled_module_ex: public controlled_module
  44. {
  45. public:
  46.     controlled_module_ex()
  47.     {
  48.         m_safe = false;
  49.     }
  50.     ~controlled_module_ex()
  51.     {
  52.         safestop();
  53.     }
  54. public:
  55.     template<typename T>
  56.     bool postmessage(unsigned int nCmd, const boost::shared_ptr& p)
  57.     {
  58.         if(this==0||!m_safe)return false;
  59.         boost::mutex::scoped_lock lock(m_mutex_command);
  60.         _command::CCmdPtr cmd(new _command);
  61.         cmd->nCmd = nCmd;
  62.         cmd->anyParam = p;
  63.         m_list_command.push_back(cmd);
  64.         return true;
  65.     }
  66.     boost::any execute(unsigned int command,boost::any par,int timeout=-1)
  67.     {
  68.         boost::shared_ptr<_wait_command> shared(new _wait_command);
  69.         _wait_command & cmd = *shared;
  70.         cmd.command = command;
  71.         cmd.event = (void *)CreateEvent(0,FALSE,FALSE,0);
  72.         cmd.par = par;
  73.         cmd.resp = boost::shared_ptr(new boost::any);
  74.         if(this->postmessage(BM_COMMAND,shared))
  75.         {
  76.             DWORD dw = WaitForSingleObject(cmd.event,timeout);
  77.             CloseHandle(cmd.event);
  78.             if(dw!=WAIT_OBJECT_0)
  79.                 return boost::any();
  80.             else
  81.                 return *cmd.resp;
  82.         }
  83.         else
  84.         {
  85.             CloseHandle(cmd.event);
  86.             return boost::any();
  87.         }
  88.     }
  89.     void notify(_notify p)
  90.     {
  91.         this->postmessage(BM_NOTIFY,p);
  92.     }
  93.     bool postmessage(unsigned int nCmd,boost::any p)
  94.     {
  95.         if(this==0||!m_safe)
  96.             return false;
  97.         boost::mutex::scoped_lock lock(m_mutex_command);
  98.         _command::CCmdPtr cmd(new _command);
  99.         cmd->nCmd = nCmd;
  100.         cmd->anyParam = p;
  101.         m_list_command.push_back(cmd);
  102.         return true;
  103.     }
  104.     bool postmessage(unsigned int nCmd)
  105.     {
  106.         if(this==0||!m_safe)
  107.             return false;
  108.         boost::mutex::scoped_lock lock(m_mutex_command);
  109.         _command::CCmdPtr cmd(new _command);
  110.         cmd->nCmd = nCmd;
  111.         cmd->anyParam = 0;
  112.         m_list_command.push_back(cmd);
  113.         return true;
  114.     }
  115.     virtual bool work()
  116.     {
  117.         if(!getmessage())
  118.             return false;
  119.         else
  120.         {
  121.             Sleep(this->m_sleeptime);
  122.             return true;
  123.         }
  124.     }
  125.     virtual void message(const _command & cmd)
  126.     {
  127.         if(cmd.nCmd==BM_RING_START)
  128.         {
  129.             this->on_safestart();
  130.         }
  131.         else if(cmd.nCmd==BM_RING_STOP)
  132.         {
  133.             this->on_safestop();
  134.         }
  135.         else if(cmd.nCmd==BM_TIMER)
  136.         {
  137.             this->on_timer(boost::any_cast(cmd.anyParam));
  138.         }
  139.         else if(cmd.nCmd==BM_COMMAND)
  140.         {
  141.             boost::shared_ptr<_wait_command> shared = boost::any_cast< boost::shared_ptr<_wait_command> >(cmd.anyParam);
  142.             _wait_command & cmd = *shared;
  143.             *cmd.resp = this->on_command(cmd.command,cmd.par);
  144.             SetEvent((HANDLE)cmd.event);
  145.         }
  146.         else if(cmd.nCmd==BM_NOTIFY)
  147.         {
  148.             try
  149.             {
  150.                 _notify par = boost::any_cast<_notify>(cmd.anyParam);
  151.                 this->on_notify(par);
  152.             }
  153.             catch(boost::bad_any_cast)
  154.             {
  155.             }
  156.         }
  157.     }
  158.     virtual void release()
  159.     {
  160.         boost::mutex::scoped_lock lock(m_mutex_command);
  161.         m_list_command.clear();
  162.     }
  163.     void safestart()
  164.     {
  165.         if(!islive())
  166.             start();
  167.         m_safe = true;
  168.         m_safestart_event = (void*)CreateEvent(NULL,FALSE,FALSE,0);
  169.         postmessage(BM_RING_START);
  170.         ::WaitForSingleObject((HANDLE)m_safestart_event,INFINITE);
  171.         CloseHandle(m_safestart_event);
  172.     }
  173.     void safestop()
  174.     {
  175.     if(this->islive())
  176.     {
  177.         m_safe = false;
  178.         m_safestop_event = (void*)CreateEvent(NULL,FALSE,FALSE,0);
  179.         {
  180.             boost::mutex::scoped_lock lock(m_mutex_command);
  181.             _command::CCmdPtr cmd(new _command);
  182.             cmd->nCmd = BM_RING_STOP;
  183.             cmd->anyParam = 0;
  184.             m_list_command.push_back(cmd);
  185.         }
  186.         DWORD dw = ::WaitForSingleObject((HANDLE)m_safestop_event,3*1000);
  187.         if(WAIT_OBJECT_0!=dw)
  188.         {
  189.         }
  190.         CloseHandle(m_safestop_event);
  191.         stop();
  192.     }
  193.     }
  194.     virtual void on_timer(const controlled_timer * p){}
  195.     virtual void on_safestart()
  196.     {
  197.         SetEvent(m_safestart_event);
  198.     }
  199.     virtual void on_safestop()
  200.     {
  201.         SetEvent(m_safestop_event);
  202.     }
  203.     virtual void on_notify(const _notify & p)
  204.     {
  205.     }
  206. protected:
  207.     virtual boost::any on_command(const unsigned int command,const boost::any par)
  208.     {
  209.         return boost::any();
  210.     }
  211.     bool getmessage()
  212.     {
  213.         std::list<_command::CCmdPtr> cache;
  214.         {
  215.             boost::mutex::scoped_lock lock(m_mutex_command);
  216.             while(!m_list_command.empty())
  217.             {
  218.                 _command::CCmdPtr p = m_list_command.front();
  219.                 m_list_command.pop_front();
  220.                 cache.push_back(p);
  221.             }
  222.         }
  223.         _command::CCmdPtr stop_command;
  224.         std::list<_command::CCmdPtr>::iterator item;
  225.         for(item = cache.begin();item!=cache.end();item++)
  226.         {
  227.             if((*(*item)).nCmd==BM_RING_STOP)
  228.             {
  229.                 stop_command = *item;               
  230.                 break;
  231.             }
  232.         }
  233.         if(stop_command.get()==0)
  234.         {
  235.             while(!cache.empty())
  236.             {
  237.                 _command::CCmdPtr p = cache.front();
  238.                 cache.pop_front();
  239.                 try
  240.                 {
  241.                     if((*p).nCmd!=BM_RING_START)
  242.                     {
  243.                         if(!this->m_safe)
  244.                             continue;
  245.                     }
  246.                     this->message(*p);
  247.                 }
  248.                 catch(boost::bad_any_cast &)
  249.                 {
  250.                 }
  251.             }
  252.             return true;
  253.         }
  254.         else
  255.         {
  256.             cache.clear();
  257.             this->message(*stop_command);
  258.             return false;
  259.         }
  260.     }
  261. private:
  262.     void*   m_safestart_event;
  263.     void* m_safestop_event;
  264.     bool m_safe;//在多线程,尤其牵涉到线程之间有类似socket级别关联时,当父线程safestop以后有可能会收到其他线程的postmessage,这时会引起线程死锁,这个m_safe就是解决这个问题的,当safestop以后不再接收新消息处理
  265.     boost::mutex m_mutex_command;
  266.     std::list<_command::CCmdPtr> m_list_command;
  267. };
  268. class controlled_timer: public controlled_module_ex
  269. {
  270. public:
  271.   controlled_timer()
  272.   {
  273.     this->m_time = 0;
  274.     this->m_parent = 0;
  275.     this->m_step = 0;
  276.   }
  277.   ~controlled_timer(){
  278.   }
  279. protected:
  280.   controlled_module_ex* m_parent;
  281.   int m_time;
  282.   int m_step;
  283. public:
  284.   void starttimer(int time,controlled_module_ex* parent)
  285.   {
  286.     this->safestart();
  287.     this->postmessage(BM_RING_SETPARENT,parent);
  288.     this->postmessage(BM_RING_SETTIME,time);
  289.   }
  290.   void stoptimer()
  291.   {
  292.     this->safestop();
  293.   }
  294. public:
  295.   virtual void on_safestop()
  296.   {
  297.     m_time = 0;
  298.     controlled_module_ex::on_safestop();
  299.   }
  300.   virtual void message(const _command & cmd)
  301.   {
  302.     controlled_module_ex::message(cmd);
  303.     if(cmd.nCmd==BM_RING_SETTIME)
  304.     {
  305.         int time = boost::any_cast<int>(cmd.anyParam);
  306.         this->m_time = time/this->m_sleeptime;
  307.         this->postmessage(BM_RING_CYCLE);
  308.     }
  309.     else if(cmd.nCmd==BM_RING_SETPARENT)
  310.     {
  311.         this->m_parent  = boost::any_cast(cmd.anyParam);
  312.     }   
  313.     else if(cmd.nCmd==BM_RING_CYCLE)
  314.     {
  315.         if(m_time>0)
  316.         {
  317.             if(m_step>m_time)
  318.             {
  319.                 m_parent->postmessage(BM_TIMER,this);
  320.                 m_step=0;
  321.             }
  322.             m_step++;
  323.         }
  324.         this->postmessage(BM_RING_CYCLE);
  325.     } 
  326.   }
  327. };
1.向线程PostMessage
  函数controlled_module_ex::postmessage完成消息推送。
  虚拟函数controlled_module_ex::message(const _command & cmd)实现消息接收。
  
  1. #include "controlled_module_ex.hpp"


  2. class thdex: public controlled_module_ex
  3. {
  4. protected:
  5.     virtual void message(const _command & cmd)
  6.     {
  7.         controlled_module_ex::message(cmd);
  8.         if(cmd.nCmd==BM_USER+1)
  9.         {
  10.             cout << "get message" << endl;
  11.         }
  12.     }
  13. };

  14. int _tmain(int argc, _TCHAR* argv[])
  15. {
  16.     thdex t;
  17.     t.safestart();
  18.     t.postmessage(BM_USER+1);
  19.     char buf[10];
  20.     gets_s(buf,sizeof buf);
  21.     t.safestop();
  22.     return 0;
  23. }
  2.向线程PostMessage,并携带简单对象参数
 我们都知道常规的PostMessage要传参,如果是整型参数,还可以用强制转换,但如果是其他类型,例如字符串,我们就必须创建一个字符 串缓冲,把缓冲指针作为参数传过去,线程还不能忘记删除,否则导致内存泄漏,自定义结构也是一样的操作,如果想尝试传递一个CString对象,是不可能 的。
  幸运的是boost提供了boost::any来抽象任何对象类型,controlled_module_ex的消息传递都是基于boost::any来完成,程序员可以由此写出干净而且内存安全的代码。
  1. #include "controlled_module_ex.hpp"

  2. struct mystruct
  3. {
  4.     string a;
  5.     int b;
  6. };
  7. class thdex: public controlled_module_ex
  8. {
  9. protected:
  10.     virtual void message(const _command & cmd)
  11.     {
  12.         controlled_module_ex::message(cmd);
  13.         if(cmd.nCmd==BM_USER+1)
  14.         {
  15.             cout << "get integer:" << boost::any_cast<int>(cmd.anyParam) << endl;
  16.         }
  17.         if(cmd.nCmd==BM_USER+2)
  18.         {
  19.             cout << "get string:" << boost::any_cast(cmd.anyParam) << endl;
  20.         }
  21.         if(cmd.nCmd==BM_USER+3)
  22.         {
  23.             mystruct par = boost::any_cast(cmd.anyParam);
  24.             cout << "get mystruct:" << par.a << "," << par.b << endl;
  25.         }
  26.     }
  27. };

  28. int _tmain(int argc, _TCHAR* argv[])
  29. {
  30.     thdex t;
  31.     t.safestart();
  32.     t.postmessage(BM_USER+1,123);
  33.     t.postmessage(BM_USER+2,string("hello world"));
  34.     mystruct par;
  35.     par.a = "hello world";
  36.     par.b = 123;
  37.     t.postmessage(BM_USER+3,par);

  38.     char buf[10];
  39.     gets_s(buf,sizeof buf);
  40.     t.safestop();
  41.     return 0;
  42. }


3.向线程PostMessage,并传递内存块参数
  假如我们书写了一个录音子线程,如何将录制的语音数据传递给其他线程呢,常规做法是创建一个缓冲,将语音数据填充进去,然后将缓冲地址作为参数传递,这种做法要求接收线程不能忘记删除,否则会导致内存泄漏。
  幸运的是boost提供了智能指针,可以类似java,c#的智能内存回收一样来管理内存分配,我们如果使用这个对象来作为参数传递,就可以完美的防范内存泄漏行为,就算子线程没有处理,别担心,内存它会自动回收的。
  1. #include "controlled_module_ex.hpp"

  2. struct mystruct
  3. {
  4.     boost::shared_ptr<char> data;
  5.     int datalen;
  6. };
  7. class thdex: public controlled_module_ex
  8. {
  9. protected:
  10.     virtual void message(const _command & cmd)
  11.     {
  12.         controlled_module_ex::message(cmd);
  13.         if(cmd.nCmd==BM_USER+1)
  14.         {
  15.             cout << "get sharedptr" << endl; //仅仅得到数据,得不到数据长度
  16.         }
  17.         if(cmd.nCmd==BM_USER+2)
  18.         {
  19.             mystruct par = boost::any_cast(cmd.anyParam);
  20.             cout << "get sharedptr len:" << par.datalen << endl;
  21.         }
  22.     }
  23. };

  24. int _tmain(int argc, _TCHAR* argv[])
  25. {
  26.     thdex t;
  27.     t.safestart();
  28.     t.postmessage(BM_USER+1,boost::shared_ptr<char>(new char[1000]));
  29.     mystruct par;
  30.     par.datalen = 1000;
  31.     par.data = boost::shared_ptr<char>(new char[par.datalen]);
  32.     t.postmessage(BM_USER+2,par);

  33.     char buf[10];
  34.     gets_s(buf,sizeof buf);
  35.     t.safestop();
  36.     return 0;
  37. }


3.向线程SendMessage
  函数controlled_module_ex::execute完成这个工作
  虚拟函数controlled_module_ex::on_command(const unsigned int command,const boost::any par)响应消息处理
  1. #include "controlled_module_ex.hpp"

  2. class thdex: public controlled_module_ex
  3. {
  4. protected:
  5.     boost::any on_command(const unsigned int command,const boost::any par)
  6.     {
  7.         if(command==1)
  8.         {
  9.             cout << "on command" << endl;
  10.             return 0;
  11.         }
  12.         if(command==2)
  13.         {
  14.             cout << "on command,par:" << boost::any_cast(par) << endl;
  15.             return 0;
  16.         }
  17.         if(command==3)
  18.         {
  19.             return true;
  20.         }
  21.         else
  22.             return controlled_module_ex::on_command(command,par);
  23.     }
  24. };

  25. int _tmain(int argc, _TCHAR* argv[])
  26. {
  27.     thdex t;
  28.     t.safestart();
  29.     t.execute(1,0);//等待子线程处理完成
  30.     t.execute(2,string("hello world"));//带参数 等待子线程完成
  31.     bool rs = boost::any_cast<bool>(t.execute(3,0));//等待子线程处理完成,并取得返回值
  32.     cout << "get thread result:" << rs << endl;
  33.     boost::any timeout = t.execute(4,0,1000);//等待子线程处理,超时1秒
  34.     if(timeout.empty())
  35.         cout << "timeout " << endl;

  36.     char buf[10];
  37.     gets_s(buf,sizeof buf);
  38.     t.safestop();
  39.     return 0;
  40. }
4.定时器
  类似于CWnd::OnTimer,controlled_module_ex也提供一个虚拟函数virtual void on_timer(const controlled_timer * p);来处理定时
  1. #include "controlled_module_ex.hpp"

  2. class thdex: public controlled_module_ex
  3. {
  4. protected:
  5.     virtual void on_timer(const controlled_timer *p)
  6.     {
  7.         cout << "ontimer" << endl;
  8.     }
  9. };

  10. int _tmain(int argc, _TCHAR* argv[])
  11. {
  12.     thdex t;
  13.     controlled_timer timer;
  14.     t.safestart();
  15.     timer.starttimer(1000,&t);

  16.     char buf[10];
  17.     gets_s(buf,sizeof buf);
  18.     t.safestop();
  19.     return 0;
  20. }
阅读(991) | 评论(0) | 转发(0) |
给主人留下些什么吧!~~