转自www.cnblogs.com/sunli/
-----------------------
总体设计
------------------------
---------------
结构:
---------------
多线程。三类线程:
主线程(1个):
监听socket,将接收到的请求sockfd分发给工作线程,以及信号处理。
worker线程(thnum个,参数指定,默认为8):
从主线程得到请求sockfd,处理请求(二进制、 HTTP、memcache协议)
timer线程(最多8个):
do_slave (1个):
更新从库。定期(1s)向master请求更新数据日志,用来更新自己的数据库
do_extpc (参数指定,默认0个):
script extension. 提供对脚本和一些命令的支持
线程间联系:
主线程通过全局queue把accept到的请求sockfd 传递给worker线程;
主线程捕捉信号,设置对应的全局变量。worker线程和 timer线程周期性的检查全局变量来获取通知。
----------------------
实现的简单代码描述
(所有代码中错误处理均已被过滤)
----------------------
主线程:
--------
int main(){
... //para config, functions registration, an so on
do {
g_restart = false; //default don't restart
... //re-open log file
signal(SIGTERM, sigtermhandler);
signal(SIGINT, sigtermhandler);
signal(SIGHUP, sigtermhandler); //for SIGHUP, set g_restart = true, serv->term = true
signal(SIGPIPE, SIG_IGN);
signal(SIGCHLD, sigchldhandler); //do nothing
ttservstart(g_serv); //
}while(g_restart);
... //resource free
}
解释:
前面省略的部分检查参数有效性, < src="http://hi.images.csdn.net/js/blog/tiny_mce/themes/advanced/langs/zh.js" type="text/javascript"> < src="http://hi.images.csdn.net/js/blog/tiny_mce/plugins/syntaxhl/langs/zh.js" type="text/javascript"> 函数注册(do_slave ...),deamonize等;
信号处理:
SIGINT、SIGTERM做相同处理,设置 serv->term = true,这样其它线程检查到该变量的变化后就退出。(信号捕捉函数不直接处理退出而只是设置全局变量,是一种多用的技巧。好处除了保持捕捉函数简洁,还能提供更好的灵活性:线程实现可以选择只在自己"愿意"的地方才检查该全局变量进而做相应处理,保证一些不能被打断的流程的完整性);
SIGHUP一是设置serv->term = true,让所有线程退出;二是设置g_restart = true,让while循环再次执行,启动所有其它线程。除了重启线程,它还重新打开日志文件。(tyrant没配置文件);
SIGPIPE忽略;
SIGCHLD捕捉,但捕捉函数什么也不做。
ttservstart创建worker线程和timer线程,统一accept用户请求并用全局queue将它们分发给worker线程:
bool ttservstart(TTSERV *serv){
... //open listening socket---unix domain for local,tcp for remote
... //create worker threads(do_worker) and timer threads(do_slave, do_extpc)
int epfd = epoll_create(256);
epoll_ctl(epfd, EPOLL_CTL_ADD, lfd, &ev); //add listening sockfd to epoll
while(!serv->term){ //SIGTERM,SIGINT
int fdnum = epoll_wait(epfd, events, 256, 200);
for(int i = 0; i < fdnum; i++){
if(events[i].data.fd == lfd){ // listening fd
... //accept and EPOLL_CTL_ADD add to epoll
}
else { //a task
...
pthread_mutex_lock(&serv->qmtx); //mutex lock queue
tclistpush(serv->queue, &cfd, sizeof(cfd)); //push clientfd to queue
pthread_mutex_unlock(&serv->qmtx);
pthread_cond_signal(&serv->qcnd); //signal to worker threads
}
}
... //check threads' timeout,if a thread timeouted,restart it. commonly,it's useless
}
pthread_cond_broadcast(&serv->qcnd); //tell wokers to close
pthread_cond_broadcast(&serv->tcnd); //tell timers to close
... //resource free
}
解释:
创建监听socket:本地使用UNIX域协议(只需本机通信时使用。通过指定参数 -port 为一个小于1的数,比如0)以获得更好的性能,远端使用TCP协议;
创建thnum个worker线程和若干个timer线程 (pthread_create);
在循环中等待请求的到来。通过线程共享的数据结构queue来分配请求任务,queue中放置的是请求sockfd,访问前加互斥锁,push任务后用信号通知worker线程;
一次完整的处理后才检查是否收到信号 (while(!serv->term);
主线程退出前要广播信号通知worker线程和timer线程是因为:
SIGTERM、 SIGINT捕捉函数只是设置serv->term为true;
这时worker线程可能正在pthread_cond_timedwait等待serv->qcnd,(200ms),即正等待主线程给它分配任务;
timer 线程是定期执行的(1s),它现在可能正在周期里。
虽然它们都可以在超时后(200ms,1s)发现到serv->term被设置进而退出,但这样的等待一段时间显然并不是最好。这里直接广播信号让它们马上退出阻塞函数。
这就是主线程的结构,很简单
---------------
worker线程:
---------------
worker从全局queue取请求sockfd并处理它们:
static void *ttservdeqtasks(void *argp){
...
pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL);
sigaddset(&sigset, SIGPIPE);
pthread_sigmask(SIG_BLOCK, &sigset, &oldsigset);
bool empty = false;
while(!serv->term){
pthread_mutex_lock(&serv->qmtx);
...
int code = empty ? pthread_cond_timedwait(&serv->qcnd, &serv->qmtx, &ts) : 0;
if(code == 0 || code == ETIMEDOUT || code == EINTR){
void *val = tclistshift2(serv->queue);
pthread_mutex_unlock(&serv->qmtx)
if(val){
empty = false;
int cfd = *(int *)val;
TTSOCK *sock = ttsocknew(cfd);
bool reuse;
do {
req->mtime = tctime();
req->keep = false;
do_task(sock, serv->opq_task, req);
reuse = false;
if(sock->end){
req->keep = false;
} else if(sock->ep > sock->rp){
reuse = true;
}
} while(reuse);
if(req->keep){
ev.events = EPOLLIN | EPOLLONESHOT;
ev.data.fd = cfd;
epoll_ctl(req->epfd, EPOLL_CTL_MOD, cfd, &ev);
} else {
epoll_ctl(req->epfd, EPOLL_CTL_DEL, cfd, NULL);
}
} else {
empty = true;
}
pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL);
pthread_testcancel();
pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL);
req->mtime = tctime();
}
pthread_sigmask(SIG_SETMASK, &oldsigset, NULL);
}
解释:
首先禁止线程结束请求,一次流程后才打开测试。也就是,不允许在流程中终止线程,不然会带来不完整性;
从全局queue取任务fd流程:
给全局queue加上互斥锁
如果上次成功取到fd(empty == false,即上次取时queue不为空),这次直接取
否则等待主线程发信号。不管是收到信号还是超时,都试着去从queue取fd
如果成功取到fd,do task
如果失败,设置标志(empty = true,这次取时queue为空),表示这次没取到fd
解锁
对fd读写的处理用TTSOCK做了封装,它底层一次从fd读取大块数据放入内部buffer(减少网络请求次数),但对上层提供一次读一个字节、一次读指定数目字节的API,方便上层开发。在"tyrant分析- 编程小技巧"里有对它的详细描述。
补充说明:
原则上无碍对"总体设计"理解的代码全部去除,这里保留(1) 是想说明,tyrant里大量使用了这种线程编程技术:开始时disable掉其它线程可能的取消该线程的请求,等一次流程结束后再打开测试是否有请求。目的是保护流程的完整性。
tyrant大量使用的另一个线程编程技术是在对某个资源的处理前pthread_cleanup_push资源释放函数,处理完后再pop。它相当于面向对象里"析构"的意思,确保资源的释放。这里没列出对应的代码,以后的"总体设计"里也都不会列出这些技术细节。
do_task分析请求的格式,根据请求协议和请求命令做相应处理:
static void do_task(TTSOCK *sock, void *opq, TTREQ *req){
int c = ttsockgetc(sock);
if(c == TTMAGICNUM){
switch(ttsockgetc(sock)){
case TTCMDPUT:
do_put(sock, arg, req);
break;
case TTCMDGET:
do_get(sock, arg, req);
break;
case TTCMDREPL:
do_repl(sock, arg, req);
break;
...
}
} else {
ttsockungetc(sock, c);
char *line = ttsockgets2(sock);
int tnum;
char **tokens = tokenize(line, &tnum);
const char *cmd = tokens[0];
if(!strcmp(cmd, "set")){
do_mc_set(sock, arg, req, tokens, tnum);
} else if(!strcmp(cmd, "add")){
do_mc_add(sock, arg, req, tokens, tnum);
} else if ...
...
} else if(tnum > 2 && tcstrfwm(tokens[2], "HTTP/1.")){
...
if(!strcmp(cmd, "GET")){
do_http_get(sock, arg, req, ver, uri);
} else if(!strcmp(cmd, "PUT")){
do_http_put(sock, arg, req, ver, uri);
} else if ...
...
}
}
}
解释:
do_task就是做三种协议的用户请求处理:
tt自己的二进制协议:第一个字节为 TTMAGICNUM(0xc8)
memcache协议: "get|set|add|... ..."
http协议: "GET|PUT|DELETE|... HTTP/1.x ..."
三种协议只是格式不同,提供的服务一致:
get: 得到请求key对应的value
put: 先更新日志(ulog),再更新db
...
当然,http,memcache只是"外部"协议,tyrant只对它们提供一些"对外"服务,如get,put等。tyrant自己的"内部"二进制协议则支持更多的命令,比如主从复制 (do_repl)。
get、put等底层实现是属于cabinet部分,会在"cabinet分析"部分详细描述
传输数据格式是tt自定义的,学习的意义不大,格式也很简单,比如它一般的格式是"ksiz + vsiz + key + value",其中没有'+',是紧挨的,因为siz长度是知道的,所以可以解析。当然还有一些有细微差别的格式。"tyrant分析-主从复制实现"里详细描述了其中一种协议的格式。
-------------
timer线程:
-------------
ttservtimer提供"定时执行"功能,它按timers提供的频率定期调用它们,这样timers只需要专注实现自己的功能:
static void *ttservtimer(void *argp){
TTTIMER *timer = argp;
pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL);
double freqi;
double freqd = modf(timer->freq_timed, &freqi);
while(!serv->term){
pthread_mutex_lock(&serv->tmtx);
struct timeval tv;
struct timespec ts;
gettimeofday(&tv, NULL);
ts.tv_sec = tv.tv_sec + (int)freqi;
ts.tv_nsec = tv.tv_usec * 1000 + freqd * 1000000000;
if(ts.tv_nsec >= 1000000000){
ts.tv_nsec -= 1000000000;
ts.tv_sec++;
}
int code = pthread_cond_timedwait(&serv->tcnd, &serv->tmtx, &ts);
if(code == 0 || code == ETIMEDOUT || code == EINTR){
pthread_mutex_unlock(&serv->tmtx);
if(code != 0) timer->do_timed(timer->opq_timed);
}
}
}
解释:
(1)将timer函数的频率时间分割出整数和小数部分,并加上当前时间,作为等待超时时间。do_slave的频率是1秒。
(2)只有在一种情况下才会收到信号tcnd,那是在主线程退出时。这时不执行do_timed < src="http://hi.images.csdn.net/js/blog/tiny_mce/themes/advanced/langs/zh.js" type="text/javascript"> < src="http://hi.images.csdn.net/js/blog/tiny_mce/plugins/syntaxhl/langs/zh.js" type="text/javascript"> ,直接进入下个循环,检查到serv->term为true(被信号捕捉函数设置),退出。
---------------------
timer之do_slave:
---------------------
do_slave用于slave端,它和master建立socket连接,请求更新内容并用之更新自己,实现主从复制:
static void do_slave(void *opq){
REPLARG *arg = opq;
int rtsfd = open(arg->rtspath, O_RDWR | O_CREAT, 00644);
tcread(rtsfd, rtsbuf, tclmin(NUMBUFSIZ - 1, sbuf.st_size));
arg->rts = strtoll(rtsbuf, NULL, 10);
tcreplopen(repl, arg->host, arg->port, arg->rts + 1, sid);
while(!err && !ttserviskilled(g_serv) && !arg->recon &&
(rbuf = tcreplread(repl, &rsiz, &rts, &rsid)) != NULL){
tculogadbredo(adb, rbuf, rsiz, true, ulog, rsid);
lseek(rtsfd, 0, SEEK_SET);
int len = sprintf(rtsbuf, "%llu\n", (unsigned long long)rts);
tcwrite(rtsfd, rtsbuf, len);
}
pthread_cleanup_pop(1);
close(rtsfd);
}
解释:
slave把上次更新到的时间rts放在文件里。每次更新就是从文件取rts,和master建立连接,循环请求到数据并用它们更新自己的ulog和db,然后更新rts文件。 master中负责处理请求的是do_task函数中的do_repl
slave-master的具体实现请参见"tyrant 分析-主从复制实现"。
tokyo tyrant源码分析-主从复制实现[转贴]
来源:http://blog.csdn.net/pingnning/archive/2009/10/24/4724377.aspx
"tyrant分析-总体设计"中已经提到,slave起一个线程(do_slave)做主从复制,它和master建立tcp连接,发送请求命令和起始时间rts +1(上次的更新时间加1秒)给master,然后循环的从master那里接收一条条的记录,更新自己db、ulog和rts file。do_slave是以1秒为频率执行的。(实际是等待一次do_slave执行完毕后,再等待1秒,然后进入下一次的do_slave,依次循环。所以"以1秒为频率执行"的表达似乎并不准确。从下面可以看到一次do_slave有可能执行较长时间)
主从复制是一个主、从交互的过程。本节依次描述协议细节、slave细节、master细节。
------------
协议细节:
do_slave(slave) do_repl(master)
-------------------
| TTMAGICNUM|
| TTCMDREPL |
| ts (+1) |
| sid | send and recv (with timeout)
------------------- ------------------------>
-----------------
send and cnd wait | NOP |
<--------------------- -----------------
-----------------------
| TCULMAGICNUM |
| rts |
| rsid |
| rsiz |
content send | rsiz-content |
<--------------------- ------------------------
next content send
<---------------------
......
rsiz-content格式:
MAGIC + cmd + ksize + vsize + key + value
其中:
cmd: TTCMDPUT | TTCMDOUT | ...
ksize,vsize分别是本条记录的key,value的长度;
slave就根据cmd和key-value对对db进行相应操作。
master的ulog由一条条独立记录组成,每条记录有相同格式:
MAGIC + ts + sid + size + content
其中:
ts : 本条记录对应的时间戳。slave请求时会带上上次更新时间戳,master根据它们来判断需要传送哪些记录给slave;
sid : server id. 唯一标识server。
size : 后面"content"长度
content格式即上面"rsiz-content"的格式,描述了一条key-value对以及对它做的操作命令。
--------------
do_slave流程:
打开rts文件(默认为ttserver.rts),读取上次的rts(replication timestamp);
和master建立socket连接(参数:-mhost,-mport),并设置socket选项:
SO_RCVTIMEO、SO_SNDTIMEO - 发送、接收超时设置为0.25秒
TCP_NODELAY - 禁止nagle算法
发送REPL请求(详见协议细节);
循环:
用recv接收数据;
解析接收数据,根据数据中指定的命令(TTCMDPUT、TTCMDOUT等)更新db和slave自己的ulog;
用接收数据里的最新rts更新slave的rts文件;
最后关闭连接
解释:
1、slave不能因偶然的网络故障之类永远阻塞在send或recv中,这样的话更新就会永远停滞了。所以它要设置发送和接收的超时。如果超时,则这次do_slave失败,等待1秒后进行下一次。send | recv失败时,它并不会用新的rts(可能压根就没请求到它)去更新自己的rts文件,所以下次还是会用旧的rts去请求,所以不会因do_slave失败而导致slave数据不全。
2、禁止nagle算法是因为有小数据的命令包的交互,不能拖延。
3、请求只发送一次,但数据是一直循环接收的。循环失败的条件是:recv失败(或超时),收到SIGINT或SIGTERM,或是更新库失败或写文件失败等;
---------------
do_repl流程:
根据slave的请求ts找到合适的ulog文件(文件名使用数字编号,依次递增),逻辑是:
从编号最大的文件依次往编号小的文件:(编号越大,ulog内容越新,ts越大)
打开文件查看它的第一条记录的ts,如果请求ts大于它,则该文件即为要找的ulog文件。
循环。当对端连接未关闭且没收到SIGINT、SIGTERM信号时:
发送NOP(测试对端连接是否关闭);
pthread_cond_timedwait等待ulog更新信号,超时值为1秒;
循环:
一次读取一条日志记录;
加上头部(MAGIC,rts,rsid,rsiz。见"协议细节");
发送给slave。
当上面读取日志失败或发送失败时,退出循环。
解释:
1、ulog由一条条的记录组成,每条记录有相同格式: MAGIC + ts + sid + size + content
2、因为ulog文件有大小上限,所以写满一个后会写下一个。按上面所说那样,文件名用数字编号,依次递增;
3、找合适ulog文件的逻辑。因为是按内容从新到旧的顺序(也即ts从大到小的顺序)查看文件,所以最先找到的其中第一条记录ts小于slave所请求ts的那个文件就是合适的文件;(该文件里ts会随着一条条记录慢慢增加,直到大于等于请求ts,这时就到了slave需要的数据处);
4、关于这两层循环的逻辑。内层循环一次发送一条记录,它是希望尽可能多地发送记录给slave,直到发送完所有记录(意外发送故障不考虑下)。退出到外层逻辑时希望这时又有ulog更新,能继续进行发送。这两层循环的目的都是希望能尽可能长地维持与slave的一次连接,从而让数据的同步更及时。
tokyo tyrant源码分析——分区存储
代码版本:官网最新版tokyotyrant-1.1.41
代码最后修改时间:2010-02-02
最近项目需要让TT支持分区功能,没办法,时隔两个多月,有重新看了一下代码,发现TT已经支持了一个简单的分区,本文就略微介绍一下。
什么是分区存储?
以mysql的分区存储为例,分区允许根据指定的规则,跨文件系统分配单个表的多个部分。表的不同部分在不同的位置被存储为单独的表。MySQL从5.1.3开始支持Partition。
手动分表 分区多张数据表一张数据表重复数据的风险没有数据重复的风险写入多张表写入一张表没有统一的约束限制强制的约束限制
MySQL支持RANGE,LIST,HASH,KEY分区类型,其中以RANGE最为常用:
Range(范围)–这种模式允许将数据划分不同范围。例如可以将一个表通过年份划分成若干个分区。
Hash(哈希)–这中模式允许通过对表的一个或多个列的Hash Key进行计算,最后通过这个Hash码不同数值对应的数据区域进行分区。例如可以建立一个对表主键进行分区的表。
Key(键值)-上面Hash模式的一种延伸,这里的Hash Key是MySQL系统产生的。
List(预定义列表)–这种模式允许系统通过预定义的列表的值来对数据进行分割。
Composite(复合模式) –以上模式的组合使用
本段引自:http://www.cnblogs.com/acpp/archive/2010/08/09/1795464.html
TT分区存储概览
TT的分区存储叫multiple database,通过启动时的参数-mul指定分区个数,如果-mul大于0,则TT自动进入分区模式。TT并没有实现mysql那么多复杂的分区策略,只实现了简单的Hash分区,每个分区都是一个单独的数据库(hdb,bdb等),也对应单独一个数据库文件。用户需要指定分区的个数,当存取一个item时,TT取按照item->key做hash找到对应数据库,调用数据库接口执行存取操作。
TT分区存储实现
TT的所有数据库操作依赖一个抽象数据库结构——TCADB(tcadb.h:45),对于TT自身支持的存储引擎(hash数据库、btree数据库、定长数据库)等,TCADB都单独提供了变量来挂载,但同时也提供了变量TCADB->skel,即skeleton database,一般用来通过动态库挂载用户自定义存储引擎。TT也巧妙的使用TCADB->skel来挂载分区存储。函数tcadbsetskelmulti(tcadb.c:3279)实现了分区存储的设置,将skel的存储引擎API指向以tcadbmul开头的一系列函数。
如何创建分区存储(tcadbmulopen)
根据用户配置的数据库目录和分区个数,TT会遍历目录下的文件,找出符合固定格式的文件,path/adbmul-xx.hdb等,xx为序列号,然后依次读文件并载入,如果xx小于分区个数则依次创建,并且所有数据库的配置参数都一致,当然,所有分区数据库的存储引擎也都必须一致。
如何存取(tcadbmulput,tcadbmulget)
比较简单了,都是调用tcadbmulidx(),通过hash找到对应的数据库,然后直接调用。
hash算法为:
uint32_t hash = 20090810;
const char *rp = (char *)kbuf + ksiz;
while(ksiz--){
hash = (hash * 29) ^ *(uint8_t *)--rp;
}
return hash % mul->num;
为什么TT只支持了hash分区
原因很简单,hash分区最简单的地方就是TT无需维护哪个item存在哪个分区上的索引,只需要维护一个hash算法,维护成本和占用空间会非常少。但这也带来了一个通用的问题,当用户通过-mul参数更改分区个数时,以往存储的数据会发生存取混乱,建议有此需求的用户,先创建另一个分区数据库,然后将数据从老数据库重新导入。
Tokyo Tyrant (ttserver)的master-slave复制协议分析
Tokyo Tyrant (ttserver)的master-slave复制协议分析
The replication protocol of Tokyo Tyrant (ttserver)
[文章作者:孙立 链接:http://www.cnblogs.com/sunli/ 更新时间:2010-06-26]
目的
ttserver已经自带了主从复制功能,而且运行非常稳定,高效,使我们在使用ttserver实现高可靠性的不二选择。为什么我还要分析它的复制协议呢?
1.需要实时从ttserver同步数据到一个异构库。比如mysql,其他的nosql数据库。
2.数据的无缝迁移,如果需要从ttserver数据库无缝切换到其他数据库,可以使用这种复制协议
3.高性能的异步处理队列,你可以设置主库为mem类型,然后使用同步协议获取数据,这实际上就是一个队列,而且还是一个持久化的队列服务。
4.其他存储写一个服务,可以用ttserver做slave。
协议结构分析
咱们从数据流向的流程开始:
客户端(slave)连接到服务器端(master)
slave连接成功后向master发送
+--------------------------------------+
|0xC8 |0xa0 | slave rts | slave sid |
|1byte |1 byte | 8 bytes | 4 bytes |
+--------------------------------------+
master收到后,想客户端发送:
+---------+
|mid |
|4 bytes |
+---------+
slave收到后可以得到服务器端的master sid.
这里就开始复制了,master继续想客户端循环不断的发送复制的数据的object.
+------+
|0xca | 表示没有任何操作
|1 byte|
+------+
发送一个操作项:
+--------------------------------------------------+
|0xc9 |rts | rsid |rsize |data |
|1 byte |8 bytes |4 bytes |4 bytes |rsize bytes |
+--------------------------------------------------+
其中data的结构:
+-----------------------------------+
|magic |cmd |[data] |
|1 bytes |1 bytes | key-value data |
+-----------------------------------+
cmd表示各种操作类型,比如put putkeep 等
slave循环接受数据进行数据ok了。
协议实现
ttserver的复制协议非常简单,因为官方并没有把复制协议写到文档中,其他操作都有文档的。就需要去分析它的C代码,结构分析出来实现就非常简单了。 我会开放一些实现,到时会在www.cnblogs.com/sunli/公布
阅读(1478) | 评论(0) | 转发(0) |