Chinaunix首页 | 论坛 | 博客
  • 博客访问: 85765
  • 博文数量: 12
  • 博客积分: 0
  • 博客等级: 民兵
  • 技术积分: 200
  • 用 户 组: 普通用户
  • 注册时间: 2013-01-23 09:57
文章分类

全部博文(12)

文章存档

2015年(11)

2014年(1)

我的朋友

分类: 服务器与存储

2015-07-18 23:38:10

    SSDB网络连接处理流程图,主要是NetworkServer::serve函数的流程



1.1 处理主循环

NetworkServer::serve函数是SSDB网络连接处理的主循环,也可以说是SSDB整个的处理循环,任务在这里被分发给任务线程池处理,处理结果在这里被返回给客户端。

点击(此处)折叠或打开

  1. void NetworkServer::serve(){
  2.     writer = new ProcWorkerPool("writer");
  3.     writer->start(num_writers);
  4.     reader = new ProcWorkerPool("reader");
  5.     reader->start(num_readers);

  6.     ready_list_t ready_list;
  7.     ready_list_t ready_list_2;
  8.     ready_list_t::iterator it;
  9.     const Fdevents::events_t *events;
  10.     
  11.     // 开始时设置三个需要监听的文件描述符,serv_link监听新连接到来,reader读任务线程池,write写任务线程池
  12.     fdes->set(serv_link->fd(), FDEVENT_IN, 0, serv_link);
  13.     fdes->set(this->reader->fd(), FDEVENT_IN, 0, this->reader);
  14.     fdes->set(this->writer->fd(), FDEVENT_IN, 0, this->writer);
  15.     
  16.     uint32_t last_ticks = g_ticks;
  17.     
  18.     while(!quit){
  19.         // status report
  20.         if((uint32_t)(g_ticks - last_ticks) >= STATUS_REPORT_TICKS){
  21.             last_ticks = g_ticks;
  22.             log_info("server running, links: %d", this->link_count);
  23.         }
  24.         
  25.         ready_list.swap(ready_list_2);
  26.         ready_list_2.clear();
  27.         
  28.         if(!ready_list.empty()){
  29.             // ready_list not empty, so we should return immediately
  30.             events = fdes->wait(0);
  31.         }else{
  32.             events = fdes->wait(50);
  33.         }
  34.         if(events == NULL){
  35.             log_fatal("events.wait error: %s", strerror(errno));
  36.             break;
  37.         }
  38.         
  39.         // 处理有读写事件发生的连接,将准备好的连接添加到ready_list中
  40.         for(int i=0; i<(int)events->size(); i++){
  41.             const Fdevent *fde = events->at(i);
  42.             // 1.事件发生在监听Link上,说明是新的连接到达
  43.             if(fde->data.ptr == serv_link){
  44.                 Link *link = accept_link();
  45.                 if(link){
  46.                     this->link_count ++;                
  47.                     log_debug("new link from %s:%d, fd: %d, links: %d",
  48.                         link->remote_ip, link->remote_port, link->fd(), this->link_count);
  49.                     // 将新的连接添加到监听队列中
  50.                     fdes->set(link->fd(), FDEVENT_IN, 1, link);
  51.                 }
  52.             // 2. 事件发生在两个任务队列上,说明有任务完成,调用proc_result处理结果
  53.             }else if(fde->data.ptr == this->reader || fde->data.ptr == this->writer){
  54.                 ProcWorkerPool *worker = (ProcWorkerPool *)fde->data.ptr;
  55.                 ProcJob job;
  56.                 if(worker->pop(&job) == 0){
  57.                     log_fatal("reading result from workers error!");
  58.                     exit(0);
  59.                 }
  60.                 if(proc_result(&job, &ready_list) == PROC_ERROR){
  61.                     //
  62.                 }
  63.             // 3. 处理其他客户端连接,这里将对客户端发送来的数据进行处理,解析客户端的请求
  64.             }else{
  65.                 proc_client_event(fde, &ready_list);
  66.             }
  67.         }

  68.         for(it = ready_list.begin(); it != ready_list.end(); it ++){
  69.             Link *link = *it;
  70.             if(link->error()){
  71.                 this->link_count --;
  72.                 fdes->del(link->fd());
  73.                 delete link;
  74.                 continue;
  75.             }

  76.             // link的recv函数将解析收到的数据
  77.             const Request *req = link->recv();
  78.             if(req == NULL){
  79.                 log_warn("fd: %d, link parse error, delete link", link->fd());
  80.                 this->link_count --;
  81.                 fdes->del(link->fd());
  82.                 delete link;
  83.                 continue;
  84.             }
  85.             // req为空,说明没有解析出完整的请求,仍然需要监听读事件,期待继续读入数据解析
  86.             if(req->empty()){
  87.                 fdes->set(link->fd(), FDEVENT_IN, 1, link);
  88.                 continue;
  89.             }
  90.             
  91.             // 如果收到的数据已经解析完成,则创建JOB对象供任务线程池处理
  92.             link->active_time = millitime();

  93.             ProcJob job;
  94.             job.link = link;
  95.             this->proc(&job);
  96.             if(job.result == PROC_THREAD){
  97.                 fdes->del(link->fd());
  98.                 continue;
  99.             }
  100.             if(job.result == PROC_BACKEND){
  101.                 fdes->del(link->fd());
  102.                 this->link_count --;
  103.                 continue;
  104.             }
  105.             
  106.             // 有一些任务在proc函数中由当前线程处理完成,因此添加到ready_list_2中,
  107.             // 在外层while循环的开头,会交换ready_list_2和ready_list,然后处理ready_list
  108.             if(proc_result(&job, &ready_list_2) == PROC_ERROR){
  109.                 //
  110.             }
  111.         } // end foreach ready link
  112.     }
  113. }

1.2 对任务线程池结果的处理

对任务结果的处理放在proc_result函数中,在这里,会将结果发送给客户端

点击(此处)折叠或打开

  1. int NetworkServer::proc_result(ProcJob *job, ready_list_t *ready_list){
  2.     Link *link = job->link;
  3.     int len;
  4.      
  5.     // 用于统计计数       
  6.     if(job->cmd){
  7.         job->cmd->calls += 1;
  8.         job->cmd->time_wait += job->time_wait;
  9.         job->cmd->time_proc += job->time_proc;
  10.     }
  11.     if(job->result == PROC_ERROR){
  12.         log_info("fd: %d, proc error, delete link", link->fd());
  13.         goto proc_err;
  14.     }
  15.     
  16.     // 这里将任务产生的结果发送给客户端
  17.     len = link->write();
  18.     //log_debug("write: %d", len);
  19.     if(len < 0){
  20.         log_debug("fd: %d, write: %d, delete link", link->fd(), len);
  21.         goto proc_err;
  22.     }

  23.     // 如果输出缓冲区不为空,则说明还有数据未完全发送给客户端,需要关注写事件
  24.     if(!link->output->empty()){
  25.         fdes->set(link->fd(), FDEVENT_OUT, 1, link);
  26.     }
  27.     // 如果输入缓冲区为空,说明客户端发送过来的数据已经全部解析完成,继续关注读事件
  28.     if(link->input->empty()){
  29.         fdes->set(link->fd(), FDEVENT_IN, 1, link);
  30.     }else{
  31.         // 读缓冲区不为空, 说明还有输入的数据需要处理,将link放到ready_list里面等待处理,同时
  32.         // 暂时不再关注该连接上的读事件
  33.         fdes->clr(link->fd(), FDEVENT_IN);
  34.         ready_list->push_back(link);
  35.     }
  36.     return PROC_OK;

  37. proc_err:
  38.     this->link_count --;
  39.     fdes->del(link->fd());
  40.     delete link;
  41.     return PROC_ERROR;
  42. }

1.3    对客户端事件处理

proc_client_event处理客户端的交互:

1.    处理读事件,将数据从内核缓冲区读到link自带的应用层缓冲区(后面将对应用层缓冲区中的数据进行解析,获取客户端的真实请求)。

2.    处理写事件,将结果数据发送给客户端(将Link的写缓冲区中的数据交给内核发送)。


点击(此处)折叠或打开

  1. /*
  2. event:
  3.     read => ready_list OR close
  4.     write => NONE
  5. proc =>
  6.     done: write & (read OR ready_list)
  7.     async: stop (read & write)
  8.     
  9. 1. When writing to a link, it may happen to be in the ready_list,
  10. so we cannot close that link in write process, we could only
  11. just mark it as closed.

  12. 2. When reading from a link, it is never in the ready_list, so it
  13. is safe to close it in read process, also safe to put it into
  14. ready_list.

  15. 3. Ignore FDEVENT_ERR

  16. A link is in either one of these places:
  17.     1. ready list
  18.     2. async worker queue
  19. So it safe to delete link when processing ready list and async worker result.
  20. */

  21. int NetworkServer::proc_client_event(const Fdevent *fde, ready_list_t *ready_list){
  22.     Link *link = (Link *)fde->data.ptr;
  23.     // 连接上有数据可读,则读到link的缓冲区中,并将link添加到ready_list等待接下来的处理
  24.     if(fde->events & FDEVENT_IN){
  25.         ready_list->push_back(link);
  26.         if(link->error()){
  27.             return 0;
  28.         }
  29.         // 读取数据到接收缓冲区
  30.         int len = link->read();
  31.         //log_debug("fd: %d read: %d", link->fd(), len);
  32.         if(len <= 0){
  33.             log_debug("fd: %d, read: %d, delete link", link->fd(), len);
  34.             link->mark_error();
  35.             return 0;
  36.         }
  37.     }
  38.     // 连接上有写事件发生,将输出缓冲区中的数据写到客户端,这里不会将link添加到ready_list,因为向客户端写数据说明该连接上的请求已处理完成,解析来只用将数据全部写到客户端即可
  39.     if(fde->events & FDEVENT_OUT){
  40.         if(link->error()){
  41.             return 0;
  42.         }
  43.         // 写数据
  44.         int len = link->write();
  45.         if(len <= 0){
  46.             log_debug("fd: %d, write: %d, delete link", link->fd(), len);
  47.             link->mark_error();
  48.             return 0;
  49.         }
  50.         // 如果全部数据写完了,则从监听列表中清除该连接的写事件
  51.         if(link->output->empty()){
  52.             fdes->clr(link->fd(), FDEVENT_OUT);
  53.         }
  54.     }
  55.     return 0;
  56. }

1.4    对ready_list的处理:

1.    对ready_list中的每个Link调用recv函数解析,recv函数会根据格式试图从Link的读缓冲区中解析出完整的请求,如果当前的数据还不够解析出一个完整的请求,则返回的req为empty,这样会继续监听该Link的读事件,希望读取更多的数据。如果解析出完整的请求,将构造JOB(任务)分发处理。
2.    proc函数将根据JOB对任务进行分发处理:读任务线程池,写任务线程池,当前线程处理。

点击(此处)折叠或打开

  1. void NetworkServer::proc(ProcJob *job){
  2.     job->serv = this;
  3.     job->result = PROC_OK;
  4.     job->stime = millitime();
  5.     
  6.     // 获取解析的Request
  7.     const Request *req = job->link->last_recv();
  8.     Response resp;

  9.     do{
  10.         // AUTH
  11.         // 检查是否需要Auth,否则发送失败的Response
  12.         if(this->need_auth && job->link->auth == false && req->at(0) != "auth"){
  13.             resp.push_back("noauth");
  14.             resp.push_back("authentication required");
  15.             break;
  16.         }
  17.         
  18.         // 从注册的处理函数表中找出对应的处理函数构造Command
  19.         Command *cmd = proc_map.get_proc(req->at(0));
  20.         if(!cmd){
  21.             resp.push_back("client_error");
  22.             resp.push_back("Unknown Command: " + req->at(0).String());
  23.             break;
  24.         }
  25.         job->cmd = cmd;
  26.         
  27.         // FLAG_THREAD:表明任务需要在其他线程中执行
  28.         if(cmd->flags & Command::FLAG_THREAD){
  29.             // 表明是需要在子线程中执行的写任务
  30.             if(cmd->flags & Command::FLAG_WRITE){
  31.                 job->result = PROC_THREAD;
  32.                 writer->push(*job);
  33.             }else{
  34.             // 在子线程中执行的读任务
  35.                 job->result = PROC_THREAD;
  36.                 reader->push(*job);
  37.             }
  38.             return;
  39.         }
  40.         // 某些任务不用再子线程中执行,直接在主线程中执行并构造处理结果
  41.         proc_t p = cmd->proc;
  42.         job->time_wait = 1000 * (millitime() - job->stime);
  43.         job->result = (*p)(this, job->link, *req, &resp);
  44.         job->time_proc = 1000 * (millitime() - job->stime) - job->time_wait;
  45.     }while(0);
  46.     
  47.     // send函数只是将Response放到link的写缓冲区中,并不真正的发送给客户端
  48.     if(job->link->send(resp.resp) == -1){
  49.         job->result = PROC_ERROR;
  50.     }else{
  51.         if(log_level() >= Logger::LEVEL_DEBUG){
  52.             log_debug("w:%.3f,p:%.3f, req: %s, resp: %s",
  53.                 job->time_wait, job->time_proc,
  54.                 serialize_req(*req).c_str(),
  55.                 serialize_req(resp.resp).c_str());
  56.         }
  57.     }
  58. }



阅读(2582) | 评论(0) | 转发(0) |
给主人留下些什么吧!~~