Chinaunix首页 | 论坛 | 博客
  • 博客访问: 87327
  • 博文数量: 12
  • 博客积分: 0
  • 博客等级: 民兵
  • 技术积分: 200
  • 用 户 组: 普通用户
  • 注册时间: 2013-01-23 09:57
文章分类

全部博文(12)

文章存档

2015年(11)

2014年(1)

我的朋友

分类: 服务器与存储

2015-08-15 18:09:26

  在SSDBServer构造阶段创建了一个BackendDump对象,它就是用来备份数据的。不过它对数据的备份并不是自发进行的(比如每隔一段时间),而是根据客户端发送的命令备份数据,然后将备份的数据发送给客户端。
  为了不让备份任务阻塞其他的服务,BackendDump通过子线程来执行备份任务。

    backend_dump.h

点击(此处)折叠或打开

  1. #ifndef SSDB_BACKEND_DUMP_H_
  2. #define SSDB_BACKEND_DUMP_H_

  3. #include "include.h"
  4. #include "ssdb/ssdb.h"
  5. #include "net/link.h"

  6. class BackendDump{
  7. private:
  8.     /* 子线程的参数*/
  9.     struct run_arg{
  10.         const Link *link;   
  11.         const BackendDump *backend;
  12.     };
  13.     /* 线程函数 */
  14.     static void* _run_thread(void *arg);
  15.     SSDB *ssdb;
  16. public:
  17.     BackendDump(SSDB *ssdb);
  18.     ~BackendDump();
  19.     void proc(const Link *link);
  20. };

  21. #endif

    1. 在NetworkServer接收到客户端的dump命令时,会执行proc_dump函数,该函数会执行BackendDump::proc
        

点击(此处)折叠或打开

  1. int proc_dump(NetworkServer *net, Link *link, const Request &req, Response *resp){
  2.     SSDBServer *serv = (SSDBServer *)net->data;
  3.     serv->backend_dump->proc(link);
  4.     return PROC_BACKEND;
  5. }
    2. BackendDump::proc函数会启动新线程去执行_run_thread函数。

    3. _run_thread函数主要有3个步骤:
      (1)根据Link解析出来的客户端请求获取备份的起点start,终点end,长度limit。
      (2)利用leveldb的iterator函数获取该范围内的数据:
              Iterator *it = backend->ssdb->iterator(start, end, limit);
      (3)将数据写入到Link的输出缓冲区,发送给客户端,子线程退出。

点击(此处)折叠或打开

  1. void* BackendDump::_run_thread(void *arg){
  2.     struct run_arg *p = (struct run_arg*)arg;
  3.     const BackendDump *backend = p->backend;
  4.     Link *link = (Link *)p->link;
  5.     delete p;

  6.     //将这个链接上的读写设置为阻塞模式,因为在此之后对客户端写数据不再通过事件通知,而是阻塞写
  7.     link->noblock(false);

  8.     const std::vector<Bytes>* req = link->last_recv();
  9.     // step1 : 获取备份的起点,终点,长度
  10.     std::string start = "";
  11.     if(req->size() > 1){
  12.         Bytes b = req->at(1);
  13.         start.assign(b.data(), b.size());
  14.     }
  15.     if(start.empty()){
  16.         start = "A";
  17.     }
  18.     std::string end = "";
  19.     if(req->size() > 2){
  20.         Bytes b = req->at(2);
  21.         end.assign(b.data(), b.size());
  22.     }
  23.     uint64_t limit = 10;
  24.     if(req->size() > 3){
  25.         Bytes b = req->at(3);
  26.         limit = b.Uint64();
  27.     }

  28.     log_info("fd: %d, begin to dump data: '%s', '%s', %" PRIu64 "",
  29.         link->fd(), start.c_str(), end.c_str(), limit);

  30.     Buffer *output = link->output;

  31.     int count = 0;
  32.     bool quit = false;
  33.     // step2 : 从底层的leveldb中获取对应的数据
  34.     Iterator *it = backend->ssdb->iterator(start, end, limit);
  35.     
  36.     link->send("begin");
  37.     // step3 : 将数据按每32K字节写到客户端
  38.     while(!quit){
  39.         if(!it->next()){
  40.             quit = true;
  41.             char buf[20];
  42.             snprintf(buf, sizeof(buf), "%d", count);
  43.             link->send("end", buf);
  44.         }else{
  45.             count ++;
  46.             Bytes key = it->key();
  47.             Bytes val = it->val();

  48.             output->append_record("set");
  49.             output->append_record(key);
  50.             output->append_record(val);
  51.             output->append('\n');
  52.             // 当link的输出缓冲区没有到达32K字节时,继续写入到输出缓冲区
  53.             if(output->size() < 32 * 1024){
  54.                 continue;
  55.             }
  56.         }
  57.         // 输出缓冲区到达32K字节时,写到客户端
  58.         if(link->flush() == -1){
  59.             log_error("fd: %d, send error: %s", link->fd(), strerror(errno));
  60.             break;
  61.         }
  62.     }
  63.     // wait for client to close connection,
  64.     // or client may get a "Connection reset by peer" error.
  65.     link->read();

  66.     log_info("fd: %d, delete link", link->fd());
  67.     delete link;
  68.     delete it;
  69.     return (void *)NULL;
  70. }

    注意:退出前调用 link->read是为了防止客户端出现   “Connection reset by peer” 错误,客户端在读取完收到的数据之后,会关闭连接,服务端link->read会返回0,然后服务端就可以知道客户端已经关闭了连接,从而退出。

    对于 ”Connect reset by peer“ 和 ”Broken pipe“ 错误,可以看看 
    http://lovestblog.cn/blog/2014/05/20/tcp-broken-pipe/

        

阅读(2500) | 评论(0) | 转发(0) |
给主人留下些什么吧!~~