在SSDBServer构造阶段创建了一个BackendDump对象,它就是用来备份数据的。不过它对数据的备份并不是自发进行的(比如每隔一段时间),而是根据客户端发送的命令备份数据,然后将备份的数据发送给客户端。
为了不让备份任务阻塞其他的服务,BackendDump通过子线程来执行备份任务。
backend_dump.h
-
#ifndef SSDB_BACKEND_DUMP_H_
-
#define SSDB_BACKEND_DUMP_H_
-
-
#include "include.h"
-
#include "ssdb/ssdb.h"
-
#include "net/link.h"
-
-
class BackendDump{
-
private:
-
/* 子线程的参数*/
-
struct run_arg{
-
const Link *link;
-
const BackendDump *backend;
-
};
-
/* 线程函数 */
-
static void* _run_thread(void *arg);
-
SSDB *ssdb;
-
public:
-
BackendDump(SSDB *ssdb);
-
~BackendDump();
-
void proc(const Link *link);
-
};
-
-
#endif
1. 在NetworkServer接收到客户端的dump命令时,会执行proc_dump函数,该函数会执行BackendDump::proc
-
int proc_dump(NetworkServer *net, Link *link, const Request &req, Response *resp){
-
SSDBServer *serv = (SSDBServer *)net->data;
-
serv->backend_dump->proc(link);
-
return PROC_BACKEND;
-
}
2. BackendDump::proc函数会启动新线程去执行_run_thread函数。
3. _run_thread函数主要有3个步骤:
(1)根据Link解析出来的客户端请求获取备份的起点start,终点end,长度limit。
(2)利用leveldb的iterator函数获取该范围内的数据:
Iterator *it = backend->ssdb->iterator(start, end, limit);
(3)将数据写入到Link的输出缓冲区,发送给客户端,子线程退出。
-
void* BackendDump::_run_thread(void *arg){
-
struct run_arg *p = (struct run_arg*)arg;
-
const BackendDump *backend = p->backend;
-
Link *link = (Link *)p->link;
-
delete p;
-
-
//将这个链接上的读写设置为阻塞模式,因为在此之后对客户端写数据不再通过事件通知,而是阻塞写
-
link->noblock(false);
-
-
const std::vector<Bytes>* req = link->last_recv();
-
// step1 : 获取备份的起点,终点,长度
-
std::string start = "";
-
if(req->size() > 1){
-
Bytes b = req->at(1);
-
start.assign(b.data(), b.size());
-
}
-
if(start.empty()){
-
start = "A";
-
}
-
std::string end = "";
-
if(req->size() > 2){
-
Bytes b = req->at(2);
-
end.assign(b.data(), b.size());
-
}
-
uint64_t limit = 10;
-
if(req->size() > 3){
-
Bytes b = req->at(3);
-
limit = b.Uint64();
-
}
-
-
log_info("fd: %d, begin to dump data: '%s', '%s', %" PRIu64 "",
-
link->fd(), start.c_str(), end.c_str(), limit);
-
-
Buffer *output = link->output;
-
-
int count = 0;
-
bool quit = false;
-
// step2 : 从底层的leveldb中获取对应的数据
-
Iterator *it = backend->ssdb->iterator(start, end, limit);
-
-
link->send("begin");
-
// step3 : 将数据按每32K字节写到客户端
-
while(!quit){
-
if(!it->next()){
-
quit = true;
-
char buf[20];
-
snprintf(buf, sizeof(buf), "%d", count);
-
link->send("end", buf);
-
}else{
-
count ++;
-
Bytes key = it->key();
-
Bytes val = it->val();
-
-
output->append_record("set");
-
output->append_record(key);
-
output->append_record(val);
-
output->append('\n');
-
// 当link的输出缓冲区没有到达32K字节时,继续写入到输出缓冲区
-
if(output->size() < 32 * 1024){
-
continue;
-
}
-
}
-
// 输出缓冲区到达32K字节时,写到客户端
-
if(link->flush() == -1){
-
log_error("fd: %d, send error: %s", link->fd(), strerror(errno));
-
break;
-
}
-
}
-
// wait for client to close connection,
-
// or client may get a "Connection reset by peer" error.
-
link->read();
-
-
log_info("fd: %d, delete link", link->fd());
-
delete link;
-
delete it;
-
return (void *)NULL;
-
}
注意:退出前调用 link->read是为了防止客户端出现 “
Connection reset by peer” 错误,客户端在读取完收到的数据之后,会关闭连接,服务端link->read会返回0,然后服务端就可以知道客户端已经关闭了连接,从而退出。
对于 ”Connect reset by peer“ 和 ”Broken pipe“ 错误,可以看看
http://lovestblog.cn/blog/2014/05/20/tcp-broken-pipe/
阅读(2500) | 评论(0) | 转发(0) |