Chinaunix首页 | 论坛 | 博客
  • 博客访问: 958983
  • 博文数量: 116
  • 博客积分: 3923
  • 博客等级: 中校
  • 技术积分: 1337
  • 用 户 组: 普通用户
  • 注册时间: 2009-04-23 01:22
文章分类

全部博文(116)

文章存档

2013年(1)

2012年(17)

2011年(69)

2009年(29)

分类: WINDOWS

2011-11-15 22:34:58

http://vincent.blog.chinaunix.net
最近在阅读关于电池充电的驱动代码,芯片是PM8606和PM8607,大概的原理就是主控芯片通过I2C总线和PM860x芯片通信,然后linux驱动挂接一个IRQ响应PM860x的中断信息,然后在IRQ的相应代码里通过I2C总线去读PM860x的中断寄存器状态(3个),然后根据指定的不同位置位来区别不同的事件,根据不同的事件去抛出一个工作给内核的工作队列去执行相关任务,这任务的主要内容是根据不同的事件去跑一个有穷状态机的流程,状态机主要工作就是自己设定PM860x的一些电压,电流,温度的门槛,当充电到一定程序越过这些门槛就进入下一个状态,然后重新设定相关门槛。然后还有一个定时器驱动的工作也定时的抛出任务给工作队列,主要是读取当前PM860x一些电压,电流,温度信息,然后根据这些信息做安全保护的作用。

其中我对里面的工作队列实现比较有兴趣,就自己用线程池模拟多cpu的情况实现了一个,不过区别还是蛮大的,还有就是实现时,我调试了一个很久的问题,就是我仿照linux的内核调用去INITWORK(work,func)时,发现work是静态的,然后插入到队列,后来发现这样是有问题,因为当我连续插入相同多个静态work时,work的链表指针已经被修改了,引起问题,还有就是多线程的同步时,由于多个锁,经常在退出时,忘记把相关的其它锁也释放,引起死锁。。。反正就是多联系,多调试。。。

下面是我实现的原理图:



代码如下:
  1. /**
  2.  * file: main.c
  3.  * author: vincent.cws2008@gmail.com
  4.  * history:
  5.  * @2011-11-06: initial
  6.  * @2011-11-13: add the notifier to register
  7.  */

  8. #include "threadpool.h"
  9. #include "stdio.h"
  10. #include "workqueuex.h"
  11. #include <stdlib.h>
  12. #include <time.h>

  13. static lock_t lock;

  14. void fwork_1(struct work_s *pwork)
  15. {
  16.     /* need to deleted by it self */
  17.     __lockx(lock);
  18.     printf("function: %s[thread_id:%08x] running ...\n", "fwork_1", gettidx());
  19.     fflush(stdout);    
  20.     __unlockx(lock);
  21.     FREE_WORK(pwork);
  22. }

  23. void fwork_2(struct work_s *pwork)
  24. {
  25.     /* need to deleted by it self */
  26.     __lockx(lock);
  27.     printf("function: %s[thread_id:%08x] running ...\n", "fwork_2", gettidx());
  28.     fflush(stdout);    
  29.     __unlockx(lock);
  30.     FREE_WORK(pwork);
  31. }
  32. void fwork_3(struct work_s *pwork)
  33. {
  34.     /* need to deleted by it self */
  35.     __lockx(lock);
  36.     printf("function: %s[thread_id:%08x] running ...\n", "fwork_3", gettidx());
  37.     fflush(stdout);    
  38.     __unlockx(lock);
  39.     FREE_WORK(pwork);
  40. }
  41. void fwork_4(struct work_s *pwork)
  42. {
  43.     /* need to deleted by it self */
  44.     __lockx(lock);
  45.     printf("function: %s[thread_id:%08x] running ...\n", "fwork_4", gettidx());
  46.     fflush(stdout);    
  47.     __unlockx(lock);
  48.     FREE_WORK(pwork);
  49. }

  50. static fwork_t fworks[] = {
  51.     fwork_1,
  52.     fwork_2,
  53.     fwork_3,
  54.     fwork_4,
  55. };

  56. void main()
  57. {
  58.     int count = 100;
  59.     work_t *pnew=null;
  60.     tasks_sched_t *pts = get_tasks_sched();
  61.     __assert(pts);
  62.     __lockx_init(lock,0);
  63.     
  64.     srand ((unsigned)time(NULL));
  65.     
  66.     init_tasks_sched();

  67.     while (count--)
  68.     {
  69.         int n = rand()%ARRAY_SIZE(fworks);
  70.         NEW_WORK(pnew,fworks[n]);
  71.         schedule_work(pnew);
  72.     }
  73.     //while (1);
  74.     sleep(5000);
  75.     exit_tasks_sched();
  76.     __lockx_exit(lock,0);
  77. }

  1. /**
  2.  * file: notifierx.c
  3.  * author: vincent.cws2008@gmail.com
  4.  * history:
  5.  * initial @ 2011-11-13
  6.  */

  7. #include "notifierx.h"


  8. int notifierx_chain_register(struct notifierx_head *pnh,
  9.         struct notifierx_block *pnb)
  10. {
  11.     __assert(pnh&&pnb);
  12.     __lockx(pnh->cslock);
  13.     list_add(&pnb->list, &pnh->head);
  14.     __unlockx(pnh->cslock);    
  15.     return 0;
  16. }

  17. int notifierx_chain_all_unregister(struct notifierx_head *pnh)
  18. {
  19.     struct list_head *list_p;
  20.     notifierx_block_t* pnb_it=null;
  21.     notifierx_block_t* pnb_temp=null;
  22.     __assert(pnh);

  23.     __lockx(pnh->cslock);
  24.     list_for_each_safe(list_p, pnb_temp, &(pnh->head))
  25.     {
  26.         pnb_it = list_entry(list_p, notifierx_block_t, list);
  27.         list_del(&pnb_it->list);
  28.     }
  29.     
  30.     __unlockx(pnh->cslock);

  31.     return 0;
  32. }

  33. int notifierx_chain_unregister(struct notifierx_head *pnh,
  34.         struct notifierx_block *pnb)
  35. {
  36.     struct list_head *list_p;
  37.     struct notifierx_block *pnb_it;
  38.     __assert(pnh&&pnb);
  39.     
  40.     __lockx(pnh->cslock);
  41.     list_for_each(list_p, &(pnh->head))
  42.     {
  43.         pnb_it = list_entry(list_p, notifierx_block_t, list);
  44.         if (pnb_it->notifierx_call == pnb->notifierx_call)
  45.         {
  46.             list_del(&pnb_it->list);
  47.             __unlockx(pnh->cslock);
  48.             return 0;
  49.         }
  50.     }
  51.     __unlockx(pnh->cslock);    
  52.     return -1;
  53. }

  54. int notifierx_call_chain(struct notifierx_head *pnh, unsigned long nlen, void *pdata)
  55. {
  56.     struct list_head *list_p;
  57.     notifierx_block_t* pnb_it=null;

  58.     __assert(pnh);
  59.     
  60.     __lockx(pnh->cslock);
  61.     list_for_each(list_p, &(pnh->head))
  62.     {
  63.         pnb_it = list_entry(list_p, notifierx_block_t, list);
  64.         pnb_it->notifierx_call(pnb_it, nlen, pdata);
  65.     }    
  66.     __unlockx(pnh->cslock);

  67.     return 0;
  68. }

  1. /**
  2.  * file: notifierx.h
  3.  * author: vincent.cws2008@gmail.com
  4.  * history:
  5.  * initial @ 2011-11-13
  6.  */

  7. #ifndef _NOTIFIERX_H_
  8. #define _NOTIFIERX_H_

  9. #include "list.h"
  10. #include "platform.h"

  11. typedef struct notifierx_block {
  12.     int (*notifierx_call)(struct notifierx_block *pnb, unsigned long nlen, const void *pdata);
  13.     struct list_head list;
  14. }notifierx_block_t;

  15. struct notifierx_head {
  16.     lock_t cslock;
  17.     attr_t attr;
  18.     struct list_head head;
  19. };

  20. #define INIT_NOTIFIERX_HEAD(name) do {                \
  21.         __lockx_init((name).cslock,(name).attr);    \
  22.         INIT_LIST_HEAD(&(name).head);                \
  23.     } while (0)
  24.         

  25. #define EXIT_NOTIFIERX_HEAD(name) do {                    \
  26.         notifierx_chain_all_unregister(&(name));    \
  27.         __lockx_exit((name).cslock, (name).attr);        \
  28.     } while (0)        

  29. extern int notifierx_chain_register(struct notifierx_head *pnh, struct notifierx_block *pnb);
  30. extern int notifierx_chain_all_unregister(struct notifierx_head *pnh);
  31. extern int notifierx_chain_unregister(struct notifierx_head *pnh, struct notifierx_block *pnb);
  32. extern int notifierx_call_chain(struct notifierx_head *pnh, unsigned long nlen, void *pdata);

  33. #endif

  1. /**
  2.  * file: platform.h
  3.  * author: vincent.cws2008@gmail.com
  4.  * history:
  5.  * @2011-11-15: initial
  6.  */

  7. #ifndef _PLATFORM_H_
  8. #define _PLATFORM_H_

  9. #ifdef WIN32
  10. # include "windows.h"
  11. #else
  12. # include <pthread.h>
  13. #endif

  14. #ifndef null
  15. #define null (void*)0
  16. #endif

  17. #ifndef NULL
  18. #define NULL (void*)0
  19. #endif

  20. #ifndef __assert
  21. #define __assert(x)
  22. #endif


  23. #ifdef WIN32
  24. #define sleep Sleep
  25. #define gettidx() GetCurrentThreadId()
  26. #else
  27. #define gettidx() pthread_self()
  28. #endif

  29. #ifdef WIN32
  30. #    define thrd_handle    HANDLE
  31. #    define event_t        HANDLE
  32. #    define lock_t        CRITICAL_SECTION
  33. #    define attr_t        void*
  34. #else
  35. #    define thrd_handle    pthread_t
  36. #    define event_t        pthread_cond_t
  37. #    define lock_t        pthread_mutex_t
  38. #    define attr_t        pthread_mutexattr_t
  39. #endif

  40. #ifdef WIN32

  41. #ifndef hnull
  42. #define hnull INVALID_HANDLE_VALUE
  43. #endif
  44. #ifndef null
  45. #define null NULL
  46. #endif
  47. #define __lockx(lock)            EnterCriticalSection(&(lock))
  48. #define __unlockx(lock)            LeaveCriticalSection(&(lock))
  49. #define __lockx_init(lock,attr) InitializeCriticalSection(&(lock))
  50. #define __lockx_exit(lock,attr) DeleteCriticalSection(&(lock));

  51. #else

  52. #ifndef hnull
  53. #define hnull (void*)0
  54. #endif
  55. #ifndef null
  56. #define null (void*)0
  57. #endif
  58. #define __lockx(lock)    pthread_mutex_lock(&(lock))
  59. #define __unlockx(lock) pthread_mutex_unlock(&(lock))
  60. #define __lockx_init(lock,attr) \
  61.     pthread_mutexattr_init(&(attr)); \
  62.     pthread_mutexattr_settype(&(attr),PTHREAD_MUTEX_RECURSIVE); \
  63.     pthread_mutex_init(&(lock),&(attr)); \

  64. #define __lockx_exit(attr,attr) \
  65.     pthread_mutex_destroy(&(lock)); \
  66.     pthread_mutexattr_destroy(&(attr)); \
  67.     
  68. #endif


  69. #define ARRAY_SIZE(x) (sizeof(x) / sizeof((x)[0]))

  70. #endif

  1. /**
  2.  * file: threadpool.c
  3.  * author: vincent.cws2008@gmail.com
  4.  * history:
  5.  * @2011-11-06: initial
  6.  * @2011-11-13: add the notifier to register
  7.  */

  8. #include "threadpool.h"

  9. #ifdef WIN32
  10. static DWORD WINAPI thread_proc(LPVOID pvoid)
  11. #else
  12. static void *thread_proc(void *pvoid)
  13. #endif
  14. {
  15.     thrdpool_t* pthrdpool = thrdpool();
  16.     thrd_desc_t* pthrd_desc=(thrd_desc_t*)pvoid;

  17.     thrd_msg_t thrd_msg;

  18.     __assert(pthrd_desc&&pthrdpool);

  19.        INIT_THRD_MSG(thrd_msg, pthrd_desc->h);
  20.         
  21.     while (1)
  22.     {
  23. #        ifdef WIN32
  24.         if (WaitForSingleObject((HANDLE)(pthrd_desc->hevent_go), INFINITE) == WAIT_OBJECT_0)
  25. #        else
  26.         int ret=0;
  27.         __lockx(pthrd_desc->cslock);
  28.         ret = pthread_cond_wait(&pthrd_desc->hevent_go, &pthrd_desc->cslock);
  29.         __unlockx(pthrd_desc->cslock);
  30.         if (ret == 0)
  31. #        endif
  32.         {
  33.             call_chain(pthrdpool->thrdpool_chain,thrd_msg,stat_busy);
  34.             __lockx(pthrd_desc->cslock);
  35.             while(pthrd_desc->ref_cnts>0) {
  36.                 if (pthrd_desc->thrd_hook)
  37.                     pthrd_desc->thrd_hook(pthrd_desc->pthrd_data);
  38.                 pthrd_desc->ref_cnts--;
  39.             }
  40. #            ifdef WIN32
  41.                 RESET_EVENT(pthrd_desc->hevent_go);
  42. #            endif
  43.             if (pthrd_desc&&pthrdpool->mod)
  44.                 CLEAN_THRD_HOOK(*pthrd_desc);
  45.             __unlockx(pthrd_desc->cslock);

  46.         }
  47. #        ifdef WIN32
  48.         if (WaitForSingleObject(pthrd_desc->hevent_exit, 0) == WAIT_OBJECT_0)
  49.             break;
  50. #        endif
  51.         
  52.         call_chain(pthrdpool->thrdpool_chain,thrd_msg,stat_free);
  53.     } /* while(1) */
  54.     return 0;    
  55. }

  56. static thrd_handle __thrd_alloc(thrd_hook_t thrd_hook, void* pdata)
  57. {
  58.     struct list_head *list_p;
  59.     thrd_desc_t* pthrd_desc=null;
  60.     
  61.     thrdpool_t* pthrdpool = thrdpool();    
  62.     __assert(pthrdpool);
  63.     __lockx(pthrdpool->cslock);
  64.     list_for_each(list_p, &(pthrdpool->list))
  65.     {
  66.         pthrd_desc = list_entry(list_p, thrd_desc_t, list);
  67.         if (!pthrd_desc->thrd_hook)
  68.         {
  69.             __lockx(pthrd_desc->cslock);
  70.             SET_HANDLER_THRD_DESC(*pthrd_desc, thrd_hook, pdata);    
  71.             __unlockx(pthrd_desc->cslock);
  72.             __unlockx(pthrdpool->cslock);
  73.             return pthrd_desc->h;
  74.         }
  75.     }
  76.     __unlockx(pthrdpool->cslock);
  77.     return 0;
  78. }

  79. static int __has_free_thrds()
  80. {
  81.     struct list_head *list_p;
  82.     thrd_desc_t* pthrd_desc=null;
  83.     
  84.     thrdpool_t* pthrdpool = thrdpool();    
  85.     __assert(pthrdpool);
  86.     __lockx(pthrdpool->cslock);
  87.     list_for_each(list_p, &(pthrdpool->list))
  88.     {
  89.         pthrd_desc = list_entry(list_p, thrd_desc_t, list);
  90.         if (0==pthrd_desc->ref_cnts) {
  91.             __unlockx(pthrdpool->cslock);
  92.             return 1;
  93.         }
  94.     }    
  95.     __unlockx(pthrdpool->cslock);
  96.     return 0;
  97. }

  98. static void __thrd_free(thrd_handle h)
  99. {
  100.     struct list_head *list_p;
  101.     thrd_desc_t* pthrd_desc=null;
  102.     
  103.     thrdpool_t* pthrdpool = thrdpool();    
  104.     __assert(pthrdpool);
  105.     __lockx(pthrdpool->cslock);
  106.     list_for_each(list_p, &(pthrdpool->list))
  107.     {
  108.         pthrd_desc = list_entry(list_p, thrd_desc_t, list);
  109.         if (h == pthrd_desc->h) {
  110.             __lockx(pthrd_desc->cslock);
  111.             CLEAR_THRD_DESC(*pthrd_desc);    
  112.             __unlockx(pthrd_desc->cslock);
  113.         }
  114.     }    
  115.     __unlockx(pthrdpool->cslock);
  116. }
  117.     
  118. static int __wakeup(thrd_handle h)
  119. {
  120.     struct list_head *list_p;
  121.     thrd_desc_t* pthrd_desc=null;
  122.     
  123.     thrdpool_t* pthrdpool = thrdpool();    
  124.     __assert(pthrdpool);
  125.     __lockx(pthrdpool->cslock);
  126.     list_for_each(list_p, &(pthrdpool->list))
  127.     {
  128.         pthrd_desc = list_entry(list_p, thrd_desc_t, list);
  129.         if (h==pthrd_desc->h)
  130.         {
  131.             __lockx(pthrd_desc->cslock);
  132.             if (hnull != pthrd_desc->hevent_go)
  133.             {
  134.                 SET_EVENT(pthrd_desc->hevent_go);
  135.                 pthrd_desc->ref_cnts++;
  136.             }
  137.             __unlockx(pthrd_desc->cslock);
  138.             __unlockx(pthrdpool->cslock);
  139.             return 0;
  140.         }
  141.     }    
  142.     __unlockx(pthrdpool->cslock);
  143.     return -1;
  144. }
  145.     
  146. static int __thrd_add()
  147. {
  148.     thrdpool_t* pthrdpool = thrdpool();
  149.     thrd_desc_t* pthrd_desc=null;
  150.     __assert(pthrdpool);

  151.     pthrd_desc=(thrd_desc_t*)malloc(sizeof(thrd_desc_t));
  152.     __assert(pthrd_desc);
  153.     
  154.     INIT_THRD_DESC(*pthrd_desc);

  155.     __lockx_init(pthrd_desc->cslock,pthrd_desc->attr);

  156. #    ifdef WIN32    
  157.     pthrd_desc->h = CreateThread(
  158.         null, // default security attributes
  159.         0, // use default stack size
  160.         thread_proc,// thread function
  161.         pthrd_desc, // argument to thread function
  162.         0, // use default creation flags
  163.         0);
  164.     if(NULL == pthrd_desc->h)
  165.         goto __err;
  166. #    else
  167.     if(pthread_create(&pthrd_desc->h, null, thread_proc, null)!=0)
  168.         goto __err;
  169. #    endif
  170.     
  171.     __lockx(pthrdpool->cslock);
  172.     list_add(&pthrd_desc->list, &pthrdpool->list);
  173.     __unlockx(pthrdpool->cslock);
  174.     return 0;

  175. __err:
  176.     __lockx_exit(pthrd_desc->cslock, pthrd_desc->attr);
  177.     free(pthrd_desc);

  178.     return -1;
  179. }

  180. static int __thrd_del(thrd_handle h)
  181. {
  182.     struct list_head *list_p;
  183.     thrd_desc_t* pthrd_desc=null;
  184.     

  185.     thrdpool_t* pthrdpool = thrdpool();    
  186.     __assert(pthrdpool);
  187.     __lockx(pthrdpool->cslock);
  188.     list_for_each(list_p, &(pthrdpool->list))
  189.     {
  190.         pthrd_desc = list_entry(list_p, thrd_desc_t, list);
  191.         if (h==pthrd_desc->h) {
  192.             
  193. #            ifdef WIN32
  194.             SET_EVENT(pthrd_desc->hevent_exit);
  195.             SET_EVENT(pthrd_desc->hevent_go);
  196. #            else
  197.             SET_EVENT(pthrd_desc->hevent_go);
  198.             sleep(1);
  199.             pthread_cancel(pthrd_desc->h);
  200. #            endif
  201.             
  202.             /* wait the thread exit */
  203.             if (hnull != pthrd_desc->h)
  204.             {
  205. #                ifdef WIN32
  206.                 /* terminal the log server thread until 1 second over */
  207.                 unsigned long state = WaitForSingleObject (pthrd_desc->h, 2000);
  208.                 switch (state)
  209.                 {
  210.                 case WAIT_TIMEOUT:
  211.                     {
  212.                         unsigned long exit_code = 0;
  213.                         if (0 != TerminateThread(pthrd_desc->h, exit_code))
  214.                             ; /* terminate the server thread */
  215.                         else
  216.                             ; /* terminate the LogSrv thread failed */
  217.                         break;
  218.                     }
  219.                 case WAIT_ABANDONED: break; /* the Thread abandoned */
  220.                 case WAIT_OBJECT_0: break; /* exit the server thread */
  221.                 default: break;
  222.                 }
  223.                 CloseHandle(pthrd_desc->h);
  224. #                else
  225.                 int state = pthread_join(pthrd_desc->h, null);
  226. #                endif
  227.                 pthrd_desc->h=hnull;
  228.             }

  229.             __lockx_exit(pthrd_desc->cslock,pthrd_desc->attr);

  230. #            ifdef WIN32
  231.             EXIT_EVENT(pthrd_desc->hevent_exit);
  232. #            endif

  233.             EXIT_EVENT(pthrd_desc->hevent_go);

  234.             /* remove itself from the list */
  235.             list_del(&pthrd_desc->list);
  236.             /* free the memory from the malloc of the thread descript struct */
  237.             free(pthrd_desc);
  238.             
  239.             break;
  240.         } // if (h==pthrd_desc->h)
  241.     }    
  242.     __unlockx(pthrdpool->cslock);
  243.     return 0;
  244. }

  245. static void __display(const thrd_desc_t* pthrd_desc)
  246. {
  247.     __assert(pthrd_desc);
  248.     printf("handle:%08x, hook: %s\n",
  249.         pthrd_desc->h, pthrd_desc->thrd_hook ? "installed":"uninstalled");
  250. }
  251.     
  252. static int __query()
  253. {
  254.     struct list_head *list_p;
  255.     thrd_desc_t* pthrd_desc=null;
  256.     
  257.     thrdpool_t* pthrdpool = thrdpool();    
  258.     __assert(pthrdpool);
  259.     __lockx(pthrdpool->cslock);
  260.     list_for_each(list_p, &(pthrdpool->list))
  261.     {
  262.         pthrd_desc = list_entry(list_p, thrd_desc_t, list);
  263.         __display(pthrd_desc);
  264.     }    
  265.     printf("------------------------------\n");
  266.     __unlockx(pthrdpool->cslock);
  267.     return 0;
  268. }

  269. static int __reg_notifier(struct notifierx_block *pnb)
  270. {
  271.     thrdpool_t* pthrdpool = thrdpool();    
  272.     __assert(pthrdpool&&pnb);
  273.     return notifierx_chain_register(&pthrdpool->thrdpool_chain, pnb);
  274. }

  275. static int __unreg_notifier(struct notifierx_block *pnb)
  276. {
  277.     thrdpool_t* pthrdpool = thrdpool();    
  278.     __assert(pnb);
  279.     return notifierx_chain_unregister(&pthrdpool->thrdpool_chain, pnb);
  280. }

  281. thrdpool_t* thrdpool()
  282. {
  283.     static thrdpool_t s_thrdpool;
  284.     return &s_thrdpool;
  285. }

  286. int thrdpool_init(int nthrds, int mod)
  287. {
  288.     thrdpool_t* pthrdpool = thrdpool();
  289.     int ret=0, count=0;
  290.     __assert(pthrdpool);

  291.     INIT_NOTIFIERX_HEAD(pthrdpool->thrdpool_chain);
  292.     __lockx_init(pthrdpool->cslock,pthrdpool->attr);

  293.     pthrdpool->mod=mod;

  294.     pthrdpool->list.next=&pthrdpool->list;
  295.     pthrdpool->list.prev=&pthrdpool->list;

  296.     pthrdpool->ops.thrd_alloc    =__thrd_alloc;
  297.     pthrdpool->ops.thrd_free    =__thrd_free;
  298.     pthrdpool->ops.wakeup    =__wakeup;
  299.     pthrdpool->ops.thrd_add =__thrd_add;
  300.     pthrdpool->ops.thrd_del    =__thrd_del;
  301.     pthrdpool->ops.query    =__query;
  302.     pthrdpool->ops.reg_notifier = __reg_notifier;
  303.     pthrdpool->ops.unreg_notifier = __unreg_notifier;
  304.     pthrdpool->ops.has_free_thrds = __has_free_thrds;

  305.     while (count<nthrds) {
  306.         ret=__thrd_add();
  307.         __assert(-1!=ret);
  308.         count++;
  309.     }
  310.     return 0;
  311. }

  312. int thrdpool_exit()
  313. {
  314.     struct list_head *list_p;
  315.     thrd_desc_t* pthrd_desc=null;
  316.     thrd_desc_t* pthrd_temp=null;
  317.     thrdpool_t* pthrdpool = thrdpool();    
  318.     __assert(pthrdpool);

  319.     list_for_each_safe(list_p, pthrd_temp, &(pthrdpool->list))
  320.     {
  321.         pthrd_desc = list_entry(list_p, thrd_desc_t, list);
  322.         if (hnull!=pthrd_desc->h)
  323.             __thrd_del(pthrd_desc->h);
  324.     }
  325.     __lockx_exit(pthrdpool->cslock,pthrdpool->attr);
  326.     EXIT_NOTIFIERX_HEAD(pthrdpool->thrdpool_chain);

  327.     return 0;
  328. }

  1. /**
  2.  * file: threadpool.h
  3.  * author: vincent.cws2008@gmail.com
  4.  * history:
  5.  * @2011-11-06: initial
  6.  * @2011-11-13: add the notifier to register
  7.  */

  8. #ifndef _THREADPOOL_H_
  9. #define _THREADPOOL_H_

  10. #include "list.h"
  11. #include "notifierx.h"
  12. #include "platform.h"


  13. typedef void (*thrd_hook_t)(void *pdata);

  14. #define stat_busy    1
  15. #define stat_free    2

  16. typedef struct thrd_msg_s{
  17.     thrd_handle h;
  18.     int stat;
  19.     void* v;
  20. }thrd_msg_t;

  21. #define INIT_THRD_MSG(st,hdl) \
  22. {\
  23.     (st).h=(hdl); \
  24.     (st).stat=stat_free;\
  25. }
  26. #define SET_THRD_STAT(st,s) \
  27. {\
  28.     (st).stat=(s);\
  29. }
  30. #define ST_SIZE_AND_VAL(st) sizeof(st),&(st)

  31. #define call_chain(chain,st,s) \
  32. {    \
  33.     SET_THRD_STAT((st), (s)); \
  34.     notifierx_call_chain(&(chain), ST_SIZE_AND_VAL(st)); \
  35. }

  36. #define reg_thread_notifierx(tp,fn) {        \
  37.     static struct notifierx_block fn##_nb;    \
  38.     (fn##_nb).notifierx_call=(fn);            \
  39.     notifierx_chain_register(&(tp).thrdpool_chain, &(fn##_nb)); \
  40. }

  41. typedef struct thrd_desc_s{
  42.     thrd_handle h;
  43.     thrd_hook_t thrd_hook;
  44.     void *pthrd_data;
  45.     event_t hevent_go;
  46.     event_t hevent_exit;
  47.     struct list_head list;
  48.     unsigned int ref_cnts;
  49.     lock_t cslock;
  50.     attr_t attr;
  51. }thrd_desc_t;


  52. #ifdef WIN32
  53. #define INIT_EVENT(h) { h=CreateEvent(NULL, TRUE, FALSE, NULL); }
  54. #define EXIT_EVENT(h) { if (hnull != (h)) CloseHandle(h); }
  55. #define SET_EVENT(h) { if (hnull != (h)) SetEvent(h); }
  56. #define RESET_EVENT(h) {if (hnull != (h)) ResetEvent(h); }
  57. #else
  58. #define INIT_EVENT(h) { pthread_cond_init(&(h), null); }
  59. #define EXIT_EVENT(h) { pthread_cond_destroy(&(h); }
  60. #define SET_EVENT(h) { pthread_cond_signal(&(h)); }
  61. #endif

  62. #define INIT_THRD_DESC(td) \
  63. { \
  64.     (td).h = 0; \
  65.     (td).thrd_hook = (thrd_hook_t)0; \
  66.     (td).pthrd_data = 0; \
  67.     INIT_EVENT((td).hevent_go); \
  68.     INIT_EVENT((td).hevent_exit); \
  69.     (td).list.prev=&(td).list; \
  70.     (td).list.next=&(td).list; \
  71.     (td).ref_cnts=0; \
  72. }

  73. #define CLEAN_THRD_HOOK(td) \
  74. { \
  75.     (td).thrd_hook = (thrd_hook_t)0; \
  76. }

  77. #define CLEAR_THRD_DESC(td) \
  78. { \
  79.     (td).thrd_hook = (thrd_hook_t)0; \
  80.     (td).pthrd_data = 0; \
  81. }

  82. #define SET_HANDLER_THRD_DESC(td,h,p) \
  83. { \
  84.     (td).thrd_hook = h; \
  85.     (td).pthrd_data = p; \
  86. }

  87. #define mod_tp_normal 0 /*mod==0: hook need NOT auto reset; */
  88. #define mod_tp_reset 1 /*mod==1: hook need to reset when next invoked */

  89. typedef struct thrdpool_ops{
  90.     thrd_handle (*thrd_alloc)(thrd_hook_t thrd_hook, void* pdata);
  91.     void (*thrd_free)(thrd_handle h);

  92.     int (*wakeup)(thrd_handle h);
  93.     
  94.     int (*thrd_add)();
  95.     int (*thrd_del)(thrd_handle h);
  96.     
  97.     int (*query)();
  98.     
  99.     int (*has_free_thrds)();

  100.     int (*reg_notifier)(struct notifierx_block *pnb);
  101.     int (*unreg_notifier)(struct notifierx_block *pnb);    
  102. }thrdpool_ops_t;

  103. typedef struct thrdpool_s{
  104.     struct thrdpool_ops ops;
  105.     struct list_head list;
  106.     struct notifierx_head thrdpool_chain;
  107.     lock_t cslock;
  108.     attr_t attr;
  109.     int mod;    /*mod==0: hook need NOT auto reset;
  110.      mod==1: hook need to reset when next invoked */
  111. }thrdpool_t;

  112. extern thrdpool_t* thrdpool();

  113. extern int thrdpool_init(int nthrds, int mod);
  114. extern int thrdpool_exit();




  115. #endif

  1. /**
  2.  * file: workqueuex.c
  3.  * author: vincent.cws2008@gmail.com
  4.  * history:
  5.  * @2011-11-15: initial
  6.  */

  7. #include "workqueuex.h"
  8. #include "threadpool.h"

  9. static struct workqueue_s *keventd_wq;

  10. tasks_sched_t *get_tasks_sched()
  11. {
  12.     static tasks_sched_t s_tasks_sched;
  13.     return &s_tasks_sched;
  14. }

  15. static struct workqueue_s* __create_wq(const char *name)
  16. {
  17.     struct workqueue_s *pnew_wq;
  18.     tasks_sched_t *pts = get_tasks_sched();
  19.     __assert(pts);

  20.     pnew_wq=malloc(sizeof(*pnew_wq));
  21.     __assert(pnew_wq);

  22.     INIT_WORKQUEUE(*pnew_wq,name,*pts);
  23.     
  24.     return pnew_wq;
  25. }

  26. static void __destroy_wq(struct workqueue_s *pwq)
  27. {
  28.     if (pwq) {
  29.         list_del(&pwq->list);
  30.         __lockx_exit(pwq->cslock, pwq->attr);
  31.         free(pwq);
  32.     }
  33. }

  34. static int __work_empty()
  35. {
  36.     struct list_head *wqlist_p;
  37.     struct workqueue_s *pwq_it;
  38. //    struct work_s *pwork_it;
  39.     
  40.     tasks_sched_t *pts=get_tasks_sched();
  41.     __assert(pts);
  42.     
  43.     list_for_each(wqlist_p, &(pts->wq_head))
  44.     {
  45.         pwq_it = list_entry(wqlist_p, workqueue_t, list);
  46.         __lockx(pwq_it->cslock);
  47.         if(!list_empty(&pwq_it->head)){
  48.             __unlockx(pwq_it->cslock);
  49.             return 0;
  50.         }
  51.         __unlockx(pwq_it->cslock);
  52.     }
  53.     return 1;
  54. }

  55. static struct work_s* __get_work()
  56. {
  57.     struct list_head *wqlist_p, *worklist_p;
  58.     struct workqueue_s *pwq_it;
  59.     struct work_s *pwork_it;

  60.     tasks_sched_t *pts=get_tasks_sched();
  61.     __assert(pts);
  62.     
  63.     list_for_each(wqlist_p, &(pts->wq_head))
  64.     {
  65.         pwq_it = list_entry(wqlist_p, workqueue_t, list);
  66.         __lockx(pwq_it->cslock);

  67.         list_for_each(worklist_p, &(pwq_it->head))
  68.         {
  69.             pwork_it = list_entry(worklist_p, work_t, list);
  70.             /* printf("[self:%08x,prev:%08x,next:%08x]\n",
  71.                 pwork_it, pwork_it->list.prev, pwork_it->list.next); */
  72.             list_del(&pwork_it->list);
  73.             __unlockx(pwq_it->cslock);
  74.             return pwork_it;
  75.         }
  76.         __unlockx(pwq_it->cslock);
  77.     }
  78.     return null;
  79. }

  80. static void __run(void)
  81. {
  82.     thrd_handle h=0;
  83.     thrdpool_t *ptp=thrdpool();
  84.     __assert(ptp);

  85.     if (ptp->ops.has_free_thrds()&&!__work_empty())
  86.     {
  87.         struct work_s* pnew_work=__get_work();
  88.         if(pnew_work) {
  89.             h=ptp->ops.thrd_alloc(pnew_work->fwork, pnew_work);
  90.             if (-1!=h) ptp->ops.wakeup(h);
  91.         }
  92.     }
  93. }

  94. int queue_work(struct workqueue_s *pwq, struct work_s *pwork)
  95. {
  96.     __assert(pwq&&pwork);
  97.     
  98.     __lockx(pwq->cslock);        

  99.     list_add(&pwork->list, &pwq->head);

  100.     __unlockx(pwq->cslock);
  101.     return 0;
  102. }

  103. int schedule_work(struct work_s *pwork)
  104. {
  105.     __assert(pwork);
  106.     queue_work(keventd_wq, pwork);
  107.     __run();
  108.     return 0;
  109. }


  110. static int __workqueue_thrd_callback(struct notifierx_block *pnb,
  111.                                      unsigned long nlen, const void *pdata)
  112. {
  113.     thrd_msg_t *pmsg=(thrd_msg_t*)pdata;
  114.     __assert(pnb);
  115.     
  116.     if (!pmsg) return 0;
  117.     switch(pmsg->stat) {
  118.     case stat_busy:
  119.         break;
  120.     case stat_free:
  121.         if(!__work_empty())
  122.             __run();
  123.         break;
  124.     default:
  125.         break;
  126.     }
  127.     return 0;
  128. }


  129. void init_tasks_sched(void)
  130. {
  131.     tasks_sched_t *pts=get_tasks_sched();
  132.     thrdpool_t *ptp=thrdpool();
  133.     
  134.     thrdpool_init(threads_number,mod_tp_reset);

  135.     __assert(pts&&ptp);
  136.     pts->pthrd_pool = ptp;
  137.     INIT_LIST_HEAD(&pts->wq_head);
  138.     pts->run == __run;

  139.     reg_thread_notifierx(*ptp,__workqueue_thrd_callback);
  140.     keventd_wq = __create_wq("events");
  141.     
  142. }

  143. void exit_tasks_sched(void)
  144. {
  145.     struct list_head *wqlist_p;
  146.     struct workqueue_s *pwq_it, *pwq_temp;
  147.     
  148.     tasks_sched_t *pts = get_tasks_sched();
  149.     __assert(pts);
  150.     
  151.     list_for_each_safe(wqlist_p, pwq_temp, &(pts->wq_head))
  152.     {
  153.         pwq_it = list_entry(wqlist_p, workqueue_t, list);
  154.         __destroy_wq(pwq_it);
  155.     }
  156.     
  157.     thrdpool_exit();
  158. }

  1. /**
  2.  * file: workqueuex.h
  3.  * author: vincent.cws2008@gmail.com
  4.  * history:
  5.  * @2011-11-15: initial
  6.  */

  7. #ifndef _WORKQUEUE_H_
  8. #define _WORKQUEUE_H_

  9. #include "workqueuex.h"

  10. #include "list.h"

  11. #include "platform.h"

  12. #define threads_number 1

  13. extern struct workqueue_s;

  14. extern struct work_s;
  15. typedef void (*fwork_t)(struct work_s *pwork);

  16. typedef struct tasks_sched{
  17.     struct thrdpool_s *pthrd_pool;
  18.     void (*run)(void);
  19.     struct list_head wq_head;
  20. }tasks_sched_t;


  21. typedef struct workqueue_s {
  22.     struct list_head list;    /* list member of the workqueues */
  23.     struct list_head head;    /* head of the sub-list */
  24.     const char *name;
  25.     int nthread;            /* -1 for any thread, else thread_number=nthread%numbers */
  26.     lock_t cslock;
  27.     attr_t attr;
  28. }workqueue_t;

  29. #define INIT_WORKQUEUE(wq,pname,ts) { \
  30.     (name)=(pname); \
  31.     __lockx_init((wq).cslock,(wq).attr); \
  32.     INIT_LIST_HEAD(&(wq).head); \
  33.     list_add(&(wq).list, &(ts).wq_head); \
  34.     (wq).nthread = 0; \
  35. }

  36. typedef struct work_s {
  37. //    void *pdata;
  38.     fwork_t fwork;
  39.     struct list_head list;
  40. }work_t;

  41. #define INIT_WORK(w,f) { \
  42.     (w).fwork=(f); \
  43.     INIT_LIST_HEAD(&(w).list); \
  44. }

  45. #define DECLARE_WORK(w, f)    \
  46.     struct work_s w = __WORK_INITIALIZER(n, f)

  47. #define NEW_WORK(pnew,f) {\
  48.     pnew=(work_t*)malloc(sizeof(work_t)); \
  49.     __assert(pnew); \
  50.     INIT_WORK(*pnew, (f)); \
  51. }

  52. #define FREE_WORK(pnew) {if(pnew) free(pnew);}


  53. extern int queue_work(struct workqueue_s *pwq, struct work_s *pwork);
  54. extern int schedule_work(struct work_s *pwork);

  55. extern tasks_sched_t *get_tasks_sched();

  56. extern void init_tasks_sched(void);
  57. extern void exit_tasks_sched(void);

  58. #endif

附件如下:

 workqueue.rar  


---- 代码一点一点积累,问题一点一点解决,莫浮躁,切记!

阅读(1978) | 评论(0) | 转发(0) |
给主人留下些什么吧!~~