Chinaunix首页 | 论坛 | 博客
  • 博客访问: 85631
  • 博文数量: 12
  • 博客积分: 0
  • 博客等级: 民兵
  • 技术积分: 200
  • 用 户 组: 普通用户
  • 注册时间: 2013-01-23 09:57
文章分类

全部博文(12)

文章存档

2015年(11)

2014年(1)

我的朋友

分类: 服务器与存储

2015-08-09 02:23:19

    之前讲到SSDB会注册任务处理函数,还会将客户端的请求封装为任务进行分发处理:
    1. 注册处理函数

点击(此处)折叠或打开


  1. // net/proc.h 
  2. #define DEF_PROC(f) int proc_##f(NetworkServer *net, Link *link, const Request &req, Response *resp)

  3. // serv.cpp
  4. #define REG_PROC(c, f) net->proc_map.set_proc(#c, f, proc_##c)

  5. void SSDBServer::reg_procs(NetworkServer *net){
  6.     REG_PROC(get, "rt");
  7.     REG_PROC(set, "wt");
  8.     REG_PROC(del, "wt");
  9.     REG_PROC(setx, "wt");
  10.     REG_PROC(setnx, "wt");
  11.     REG_PROC(getset, "wt");
  12.     REG_PROC(getbit, "rt");
  13.     REG_PROC(setbit, "wt");
  14.     REG_PROC(countbit, "rt");
  15.     REG_PROC(substr, "rt");
  16.     REG_PROC(getrange, "rt");
  17.     REG_PROC(strlen, "rt");
  18.     .......
  19. }
    2. 任务分发处理:
    

点击(此处)折叠或打开


  1. // server.cpp 
  2. if(cmd->flags & Command::FLAG_THREAD){
  3.             if(cmd->flags & Command::FLAG_WRITE){
  4.                 job->result = PROC_THREAD;
  5.                 // 写任务线程池处理
  6.                 writer->push(*job);
  7.             }else{
  8.                 job->result = PROC_THREAD;
  9.                 // 读任务线程池处理
  10.                 reader->push(*job);
  11.             }
  12.             return;
  13.         }

  14.         proc_t p = cmd->proc;
  15.         job->time_wait = 1000 * (millitime() - job->stime);
  16.         // 当前主线程处理
  17.         job->result = (*p)(this, job->link, *req, &resp);
  18.         job->time_proc = 1000 * (millitime() - job->stime) - job->time_wait;

下面将详细分析SSDB中的任务和任务执行

一. ProcJob && Command && ProcMap

    与任务相关的数据结构的定义都放在proc.h中,交给任务线程池处理的对象是Command类型的,而不是ProcJob类型,Command对象是根据ProcJob构造来的。它们两者的区别在于:
    ProcJob关心的是在哪个Link上需要进行任务处理,至于处理的具体任务是什么,处理函数是什么,ProcJob并不关心。
    Command 则是众多处理任务中的一个,它包含了处理函数是什么,具体任务类型是什么。在一个Link(连接)上可能会发生很多次任务处理,那么只会有一个ProcJob,但会包含很多Command。

    ProcMap则是处理函数的注册表,NetworkServer将根据名字查找注册函数,并获得任务类型

点击(此处)折叠或打开

  1. #define PROC_OK            0
  2. #define PROC_ERROR        -1
  3. #define PROC_THREAD 1
  4. #define PROC_BACKEND    100

  5. // 通过宏定义处理函数
  6. #define DEF_PROC(f) int proc_##f(NetworkServer *net, Link *link, const Request &req, Response *resp)

  7. typedef std::vector<Bytes> Request;
  8. // 处理函数的类型
  9. typedef int (*proc_t)(NetworkServer *net, Link *link, const Request &req, Response *resp);

  10. // Command类
  11. // FLAG_READ    : 读任务
  12. // FLAG_WRITE   : 写任务
  13. // FLAG_BACKEND : 需要新建线程背地执行的特殊任务(例如主从同步,日志处理)
  14. // FLAG_THREAD  : 需要放到任务线程池中执行的任务
  15. struct Command{
  16.     static const int FLAG_READ        = (1 << 0);
  17.     static const int FLAG_WRITE        = (1 << 1);
  18.     static const int FLAG_BACKEND    = (1 << 2);
  19.     static const int FLAG_THREAD    = (1 << 3);

  20.     std::string name;
  21.     int flags;
  22.     proc_t proc;
  23.     // 统计相关的变量
  24.     uint64_t calls;
  25.     double time_wait;
  26.     double time_proc;
  27.     
  28.     Command(){
  29.         flags = 0;
  30.         proc = NULL;
  31.         calls = 0;
  32.         time_wait = 0;
  33.         time_proc = 0;
  34.     }
  35. };


  36. // ProcJob类
  37. struct ProcJob{
  38.     int result;
  39.     NetworkServer *serv;
  40.     // 任务涉及的连接
  41.     Link *link;
  42.     Command *cmd;
  43.     double stime;
  44.     double time_wait;
  45.     double time_proc;
  46.     
  47.     ProcJob(){
  48.         result = 0;
  49.         serv = NULL;
  50.         link = NULL;
  51.         cmd = NULL;
  52.         stime = 0;
  53.         time_wait = 0;
  54.         time_proc = 0;
  55.     }
  56. };


  57. struct BytesEqual{
  58.     bool operator()(const Bytes &s1, const Bytes &s2) const {
  59.         return (bool)(s1.compare(s2) == 0);
  60.     }
  61. };
  62. struct BytesHash{
  63.     size_t operator()(const Bytes &s1) const {
  64.         unsigned long __h = 0;
  65.         const char *p = s1.data();
  66.         for (int i=0 ; i<s1.size(); i++)
  67.             __h = 5*__h + p[i];
  68.         return size_t(__h);
  69.     }
  70. };


  71. #define GCC_VERSION (__GNUC__ * 100 + __GNUC_MINOR__)
  72. #if GCC_VERSION >= 403
  73.     #include <tr1/unordered_map>
  74.     typedef std::tr1::unordered_map<Bytes, Command *, BytesHash, BytesEqual> proc_map_t;
  75. #else
  76.     #ifdef NEW_MAC
  77.         #include <unordered_map>
  78.         typedef std::unordered_map<Bytes, Command *, BytesHash, BytesEqual> proc_map_t;
  79.     #else
  80.         #include <ext/hash_map>
  81.         typedef __gnu_cxx::hash_map<Bytes, Command *, BytesHash, BytesEqual> proc_map_t;
  82.     #endif
  83. #endif

  84. // 将所有的处理函数都注册到ProcMap中
  85. class ProcMap
  86. {
  87. private:
  88.     proc_map_t proc_map;

  89. public:
  90.     ProcMap();
  91.     ~ProcMap();
  92.     void set_proc(const std::string &cmd, const char *sflags, proc_t proc);
  93.     void set_proc(const std::string &cmd, proc_t proc);
  94.     Command* get_proc(const Bytes &str);
  95.     
  96.     proc_map_t::iterator begin(){
  97.         return proc_map.begin();
  98.     }
  99.     proc_map_t::iterator end(){
  100.         return proc_map.end();
  101.     }
  102. };


二 . 任务执行: WorkerPool && ProcWoker

    在NetworkServer解析出客户端的请求后,会根据需要将任务添加到任务线程池中处理,处理完成之后,再从线程池的结果队列中获取已完成的任务
    

点击(此处)折叠或打开

  1. if(cmd->flags & Command::FLAG_THREAD){
  2.             if(cmd->flags & Command::FLAG_WRITE){
  3.                 job->result = PROC_THREAD;
  4.                 writer->push(*job);
  5.             }else{
  6.                 job->result = PROC_THREAD;
  7.                 reader->push(*job);
  8.             }
  9.             return;
  10.         }

    任务线程池(reader, writer) 类型为WorkerPool,它的定义在thread.h中:

点击(此处)折叠或打开

  1. template<class W, class JOB>
  2. class WorkerPool{
  3.     public:
  4.         class Worker{
  5.             public:
  6.                 Worker(){};
  7.                 Worker(const std::string &name);
  8.                 virtual ~Worker(){}
  9.                 int id;
  10.                 virtual void init(){}
  11.                 virtual void destroy(){}
  12.                 virtual int proc(JOB *job) = 0;
  13.             private:
  14.             protected:
  15.                 std::string name;
  16.         };
  17.     private:
  18.         std::string name;

  19.         // 工作者池需要处理的JOB
  20.         Queue<JOB> jobs;
  21.         // 处理完成后的结果
  22.         SelectableQueue<JOB> results;

  23.         int num_workers;
  24.         std::vector<pthread_t> tids;
  25.         bool started;

  26.         struct run_arg{
  27.             int id;
  28.             WorkerPool *tp;
  29.         };
  30.         static void* _run_worker(void *arg);
  31.     public:
  32.         WorkerPool(const char *name="");
  33.         ~WorkerPool();

  34.         int fd(){
  35.             return results.fd();
  36.         }
  37.         
  38.         int start(int num_workers);
  39.         int stop();
  40.         
  41.         int push(JOB job);
  42.         int pop(JOB *job);
  43. };
   
    线程池启动:
    

点击(此处)折叠或打开

  1. template<class W, class JOB>
  2. void* WorkerPool<W, JOB>::_run_worker(void *arg){
  3.     struct run_arg *p = (struct run_arg*)arg;
  4.     int id = p->id;
  5.     WorkerPool *tp = p->tp;
  6.     delete p;

  7.     W w(tp->name);
  8.     Worker *worker = (Worker *)&w;
  9.     worker->id = id;
  10.     worker->init();
  11.     while(1){
  12.         JOB job;
  13.         // 获取任务
  14.         if(tp->jobs.pop(&job) == -1){
  15.             fprintf(stderr, "jobs.pop error\n");
  16.             ::exit(0);
  17.             break;
  18.         }
  19.         // 处理任务
  20.         worker->proc(&job);
  21.         // 写入结果
  22.         if(tp->results.push(job) == -1){
  23.             fprintf(stderr, "results.push error\n");
  24.             ::exit(0);
  25.             break;
  26.         }
  27.     }
  28.     worker->destroy();
  29.     return (void *)NULL;
  30. }

  31. template<class W, class JOB>
  32. int WorkerPool<W, JOB>::start(int num_workers){
  33.     this->num_workers = num_workers;
  34.     if(started){
  35.         return 0;
  36.     }
  37.     int err;
  38.     pthread_t tid;
  39.     for(int i=0; i<num_workers; i++){
  40.         struct run_arg *arg = new run_arg();
  41.         arg->id = i;
  42.         arg->tp = this;
  43.         // 创建任务线程
  44.         err = pthread_create(&tid, NULL, &WorkerPool::_run_worker, arg);
  45.         if(err != 0){
  46.             fprintf(stderr, "can't create thread: %s\n", strerror(err));
  47.         }else{
  48.             tids.push_back(tid);
  49.         }
  50.     }
  51.     started = true;
  52.     return 0;
  53. }


   
    Worker类抽象了具体的执行任务过程:
        step1 . 根据ProcJob获取对应的Command,执行其中的任务处理函数
        step2 . 将任务结果放到Link的输出缓冲区中

点击(此处)折叠或打开

  1. int ProcWorker::proc(ProcJob *job){
  2.     const Request *req = job->link->last_recv();
  3.     Response resp;
  4.     
  5.     proc_t p = job->cmd->proc;
  6.     // 任务等待的时长
  7.     job->time_wait = 1000 * (millitime() - job->stime);
  8.     // 根据传入的处理函数处理请求,得到结果
  9.     job->result = (*p)(job->serv, job->link, *req, &resp);
  10.     // 任务处理的时长
  11.     job->time_proc = 1000 * (millitime() - job->stime) - job->time_wait;
  12.     // 任务结果放到Link输出缓冲区中
  13.     if(job->link->send(resp.resp) == -1){
  14.         job->result = PROC_ERROR;
  15.     }else{
  16.         log_debug("w:%.3f,p:%.3f, req: %s, resp: %s",
  17.             job->time_wait, job->time_proc,
  18.             serialize_req(*req).c_str(),
  19.             serialize_req(resp.resp).c_str());
  20.     }
  21.     return 0;
  22. }


    线程池停止:
    

点击(此处)折叠或打开

  1. template<class W, class JOB>
  2. int WorkerPool<W, JOB>::stop(){
  3.     // TODO: notify works quit and wait
  4.     for(int i=0; i<tids.size(); i++){
  5. #ifdef OS_ANDROID
  6. #else
  7.         pthread_cancel(tids[i]);
  8. #endif
  9.     }
  10.     return 0;
  11. }


注意: WorkerPool中的任务队列( jobs ) 和结果队列 (
results )使用了不同的数据结构,这与它们的应用场景有关:
    Queue : Queue内部仅仅使用pthread_mutex和pthread_cond_t来同步读写操作,当Queue为空时,读者(任务线程)会阻塞在条件变量上等待新任务的到达。
    SelectableQueue :SelectableQueue的写入操作是工作者线程在完成任务之后写入结果,读取操作是网络处理主线程 (NetworkServer::serv )读取结果发送给客户端,因此在SelectableQueue为空时,不能让网络处理主线程阻塞在SelectableQueue上。因此在工作者线程写入时,会使用pipe通知主线程来读取,主线程读取的时候,仍然需要pthread_mutex同步。

    Queue的push和pop操作:

点击(此处)折叠或打开

  1. template <class T>
  2. int Queue<T>::push(const T item){
  3.     if(pthread_mutex_lock(&mutex) != 0){
  4.         return -1;
  5.     }
  6.     {
  7.         items.push(item);
  8.     }
  9.     pthread_mutex_unlock(&mutex);
  10.     pthread_cond_signal(&cond);
  11.     return 1;
  12. }

  13. template <class T>
  14. int Queue<T>::pop(T *data){
  15.     if(pthread_mutex_lock(&mutex) != 0){
  16.         return -1;
  17.     }
  18.     {
  19.         // 必须放在循环中, 因为 pthread_cond_wait 可能抢不到锁而被其它处理了
  20.         while(items.empty()){
  21.             //fprintf(stderr, "%d wait\n", pthread_self());
  22.             if(pthread_cond_wait(&cond, &mutex) != 0){
  23.                 //fprintf(stderr, "%s %d -1!\n", __FILE__, __LINE__);
  24.                 return -1;
  25.             }
  26.             //fprintf(stderr, "%d wait 2\n", pthread_self());
  27.         }
  28.         *data = items.front();
  29.         //fprintf(stderr, "%d job: %d\n", pthread_self(), (int)*data);
  30.         items.pop();
  31.     }
  32.     if(pthread_mutex_unlock(&mutex) != 0){
  33.         //fprintf(stderr, "error!\n");
  34.         return -1;
  35.     }
  36.         //fprintf(stderr, "%d wait end 2, job: %d\n", pthread_self(), (int)*data);
  37.     return 1;
  38. }


SelectableQueue的push和pop操作:

点击(此处)折叠或打开

  1. template <class T>
  2. int SelectableQueue<T>::push(const T item){
  3.     if(pthread_mutex_lock(&mutex) != 0){
  4.         return -1;
  5.     }
  6.     {
  7.         items.push(item);
  8.     }
  9.     // 写入管道通知主线程读取
  10.     if(::write(fds[1], "1", 1) == -1){
  11.         fprintf(stderr, "write fds error\n");
  12.         exit(0);
  13.     }
  14.     pthread_mutex_unlock(&mutex);
  15.     return 1;
  16. }

  17. template <class T>
  18. int SelectableQueue<T>::pop(T *data){
  19.     int n, ret = 1;
  20.     char buf[1];

  21.     while(1){
  22.         // 读取管道数据
  23.         n = ::read(fds[0], buf, 1);
  24.         if(n < 0){
  25.             if(errno == EINTR){
  26.                 continue;
  27.             }else{
  28.                 return -1;
  29.             }
  30.         }else if(n == 0){
  31.             ret = -1;
  32.         }else{
  33.             // phtead_mutex同步读取操作
  34.             if(pthread_mutex_lock(&mutex) != 0){
  35.                 return -1;
  36.             }
  37.             {
  38.                 if(items.empty()){
  39.                     fprintf(stderr, "%s %d error!\n", __FILE__, __LINE__);
  40.                     pthread_mutex_unlock(&mutex);
  41.                     return -1;
  42.                 }
  43.                 *data = items.front();
  44.                 items.pop();
  45.             }
  46.             pthread_mutex_unlock(&mutex);
  47.         }
  48.         break;
  49.     }
  50.     return ret;
  51. }

    


阅读(1880) | 评论(0) | 转发(0) |
给主人留下些什么吧!~~