分类: C/C++
2013-11-15 16:36:52
原文地址:Memcached内部数据处理过程解析 作者:CUKdd
阅读本博文的朋友们,最好先阅读一下我的另外三篇博文(扫扫盲):
《Memcached软件源码级执行流程解读》
《Memcached内部协议解析》
《Memcached多线程技术解析》
写在前面:解读过程中使用的术语或变量名称,尽量和源码中的保持一致。这样,便于大家学习参考。若是某些名称和源码中的存在冲突,则以源码为准。该软件版本是:Memcached-1.4.5
OK!现在一切都准备就绪,接下来的工作就是等待coming-in的数据,并对其处理。接下来,我们先以TCP为例,解读其工作过程。
假如现在有一个客户端已经和服务器监听端口建立了一个链接,那么接下来libevent就会调用注册到监听端口的处理函数,也就是event_handler()函数,其主参数就是与该链接相关的struct conn *conn.该函数就是memcached内部协议处理的入口函数。
从这里开始,我们将经历一段痛苦,紧张而又有点兴奋的旅程。让我们马上开始吧...... Let's go!
接下来调用的函数是drive_machine()。这是在解读memcached时,首次提到的函数,它将会处触发后续数据处理的整个流程。该函数是一个体型比较大的自动机处理函数,在这里可以通过一个较大的switch结构,管理协议中的所有分支。
这个状态是连接刚刚开始时,后续数据处理的起始状态。在该状态中,需要调用accept()函数从已连接队列中取出一个连接,并返回套接口描述符sfd. accepte()函数成功返回后,紧接着就是设置sfd为非阻塞套接口(fcntl(sfd, F_SETFL, O_NONBLOCK))。此刻以后对该套接口的操作将都是非阻塞的。注意对阻塞的套接口的(I/O)操作并非适用于非阻塞套接口,这里会涉及到很多技术方法和细节(下文中有讲解),所以若是在软件设计中也使用了非阻塞文件描述符时,也要特别注意这些,希望这里的讲解对以后您的工作有帮助。那么,接下来需要把刚刚接受到的新连接递交给工作线程,这里会涉及到线程间的通信(利用管道方式),也是线程之间协同进行工作的关键,完成此项工作的函数就是dispatch_comm_new()。
其原型如下:
void dispatch_conn_new(int sfd, enum conn_states init_state, int event_flags,
int read_buffer_size, enum network_transport transport);
该函数主要是用来派遣一个新的连接到其他的线程。线程的选择算法采用轮询的方式进行。然后,通过cqi_new()函数从全局空闲连接管理队列中取出一个item(TYPE is CQ_ITEM *),通过cq_push()函数将该item添加到选定线程的new_conn_queue上。
在函数cq_push()中,由于操作的new_conn_queue是多个线程(主线程和该线程)共享的数据结构,因此对它的操作需要mutex锁的保护。最后,更重要的是该线程可能由于没有可供运转的连接资源而处于挂起状态,那么这里就需要通过pthread_cond_signal(&cq->cond)函数将工作线程唤醒。
由于待唤醒的工作线程马上要接替主线程完成对新连接后续的处理,那么在真正的通过管道将新连接通知给工作线程之前,还需要在dispatch_conn_new()函数做一点点的初始化工作:首先需要将标识新连接的套接口描述符sfd赋值到item->sfd, 连接在memcached内部所处的状态conn_new_cmd设置到item->init_state中以及在对该新连接处理时需要让libevent关心的事件标志设置到item->event_flags中;其次,设置item->read_buffer_size为DATA_BUFFER_SIZE(=2048);最后,设置该套接口的类型为tcp_transport (这里可设置的类型可以有如下三种:local_transport(Unix sockets),tcp_transport,udp_transport)。完成以上初始化工作后,就可以调用write()函数向管道的写端thread->notify_send_fd上发送一个字节的数据了(注意这里没有什么原因说是为什么会是这样,这只是memcached内部采取的一个技术措施而已,当然你也可以采用其他的可行方法)。当线程工作线程读到写端发送来的一个字节后,就知道有新的连接已经就绪,可以开始工作了,其工作工程见下文。
这里还需要说明的是,当accpet()出错时,我们需要对其进行不同的错误处理。由于监听的套接口是非阻塞的,所以它有可能会返回EAGAIN或是EWOULDBLOCK错误(区别于不同的系统),那么这时应该立即结束本次处理,而将控制返还到libevent中。但错误若是EMFILE,说明此时系统中建立的连接数量太多而无法继续建立连接了,那么就需要通过accept_new_conns()->do_accept_new_conns()调用listen()函数,将backlog设置成零(注意,在已经处于监听状态的套接口上再次调用listen()函数,可以修改其配置信息,如backlog),借此来通知系统不再继续接收新的链接了。同时也会立即结束本次处理,而将控制返还到libevent中。但返回了其他的错误,那么此时就是真的出错了,带打印一条出错信息后也立即结束本次处理,将控制返还到libevent中。
在conn_listening状态中,我们已经告诉工作线程即将进入的状态——conn_new_cmd,那么接下来该状态的处理自然就由工作线程来进行了。我们来看看通过pthread_cond_signal()和write()来唤醒的线程的工作过程。
从上文中可以知道,当管道的读端.notify_receive_fd上有数据可读时,libevent就会触发函数thread_libevent_process()函数的执行。于是,当主线程通过wirte()向.notify_send_fd上写了一个字节的数据后,libevent就会调度thread_libevent_process()函数执行了。在该函数里,首先调用read()函数将管道中的一个字节的数据取出;然后,从操作该线程的结构中的new_conn_queue上通过cp_pop()函数去除一个待处理的item。接下来就需要为新连接item->sfd创建链接管理资源了,实质上是一个指向conn结构的指针c。c的分配函数是conn_new()函数,在该函数中执行了对c中成员的初始化,但重要的两点是,它把sfd和c同时注册到了属于该线程的libevent中,同时将sfd的状态置成了conn_new_cmd,那么此时与其对应的处理函数是event_handler()。当conn_new()函数成功返回后,用于线程间传递消息的item(Type is CQ_ITEM*) 也就不再被需要,而被返还到全局连接管理队列中了。
当libevent检测到sfd上有数据可读时,就会触发event_handler()函数,那么此时再通过event_handler()而进入drive_machine()函数的话,它就要从conn_new_cmd状态开始了。对该状态的处理,需要依据配置中的settings.reqs_per_event。若是它的值是1,则每次event事件,就只处理一个request,但若是大于1,则每次event事件,就通过while(){}循环处理多次。不论是哪种情况,都需要调用reset_cmd_handler()函数。在该函数中,首先将c->cmd,c->substate设置成-1和bin_no_state;其中substate是在binary传输模式下的用于标识自动机转换状态的变量。随后检测c中的item是否已经返还到slabclass中,若否则调用item_remove()函数将其返还。接下来调用conn_shrink()函数,根据当前c中表示各个缓冲区大小的参数与HIGHWAT的比较结果进行调整,realloc过程这里不进行详述。而后根据c中是否已经接收到请求数据把c所处的状态调整为conn_parse_cmd和conn_waiting。假如还没接收到任何请求数据,那么此时调用函数conn_set_state()的参数是conn_waiting。
接上一步末尾的假设部分,待到客户端的请求数据到来后,drive_machine()就会进入conn_waiting状态的处理过程中。在这里需要先将libevent对c监控的事件类型修改为EV_READ|EV_PERSIST(当然,若原来监控的事件就是EV_READ|EV_PERSIST,那么就不需要进行修改了),若这里出错的话,那么就需要通过函数conn_set_state()函数将c的状态调整到conn_closing。待下次libevent触发drive_machine()函数时,就会在conn_closing状态中将c关闭。若前面一切都没有错误发生的话,在该状态处理的末尾会将c的状态置为conn_read。那么待libevent下次触发drive_machine()时,就会进入该状态。
当自动机转移到这一步的时候,需要先判断与c对应的套接口描述符是TCP还是UDP类型的。res = IS_UDP(c->transport) ? try_read_udp(c) : try_read_network(c);其中,try_read_udp()是读取UDP数据的函数,try_read_network()是用来读取TCP数据的函数。我们先跳过上面两个函数的分析,直接看对它们返回值的处理。若是返回READ_NO_DATA_RECEIVED,则将c->state设置成conn_waiting;若是返回READ_DATA_RECEIVED,则将c->state设置程conn_parse_cmd;若是返回READ_ERROR,则将c->state设置成conn_closing;若是返回READ_MEMORY_ERROR,表示没有足够多的内存供使用了,这时应将c->state设置成conn_closing,需要注意的是这个出错状态的处理是在try_read_network()函数中进行的。下面我们来看看try_read_udp()和try_read_network()函数。先来分析下try_read_udp()函数。在该函数中,通过调用recvfrom()函数接收一个UDP数据报,若长度小于或等于8个字节,则说明这是一个不完整的数据报(病态数据报)或者是还没有数据传来,此时函数返回READ_NO_DATA_RECEIVED。若是长度大于8个字节,则表示接收到了一个数据包,那么该数据报的第0~1个字节是client填充的请求ID,第2~3字节是序列号,4~5是这个消息中的数据报总数,6~7是保留位,必须为零。(协议解析详见:memcached协议详细解析)但是,目前memcached不支持多数据报的请求,服务器接收到这样的请求时会将其丢弃。这里需要注意的是接收到的数据存放在c->rbuf。当解析完协议帧头后,memcached通过memmove()函数将第8字节(包含第8字节)之后的数据前向移动了8个字节,也就是舍弃掉了帧头的8个字节。同时将接收的到的字节数(不包含帧头的8个字节)累加到c->rbytes上,请c->rcurr设置成了c->rbuf。到了这里,函数将返回READ_DATA_RECEIVED。try_read_network()函数首先查看当前数据的处理位置是否是在缓冲区开始的位置,若否,则将其调整至那里,同时若c->rbytes非零,这需要将c->rcurr处开始的c->rbytes字节数据memmove()到c->rbuf处。这里的数据接收函数是recv()。当接收到的数据超过了缓冲区容许的大小时,需要通过realloc()函数扩容,但是最多扩容四次,也就是最大能扩大到32KB。这里收到的数据都是请求数据,如果已经完整的读取了一行,那么memcached将进入到下面的一个状态。
上面的一个状态成功处理后,memcached就会进入这个状态中。在这里有一个处理函数,用于解析请求命令,并根据它进行相应的处理,但是函数的名字为try_read_command(),有点难懂,个人觉得try_parse_command()会更好些^_^try_read_command()函数在对请求帧头的处理时,将区分binary_prot和ascii_prot模式。若是binary_prot模式的话,那么这时就用c->binary_header(Type is protocol_binary_request_header)接收到来的请求数据头部。其中需要关键处理的几个字段是keylen(键长度)、bodylen(数据长度)、cas(检查数据版本的标识)和cmd(命令)。而后通过调用dispatch_bin_command()函数将下一步的操作派遣出去——针对不同的命令,下一步操作可能会是转入conn_nread状态,也可能是直接向客户端产生回应数据(如version请求)。若是ascii_prot模式的话,首先检查缓冲区的数据是否是一个完整行(依据是数据中是否有'\n'),若否的话,则该请求可能是一个多数据包的get请求,否则断开连接;若是的话,通过调用process_command()函数对 memcached内部协议解析(ascii_prot) 中讲解的协议中各个命令进行解析、处理。
在这个状态中,memcached试图读取长度为n字节的数据(可能需要多次才能读取完整),当c->rlbtyes为0时,数据读取完毕。那么此时通过调用complete_nread()函数,根据conn_parse_cmd状态中对命令的分析结果去处理这些数据。处理过程将按照 memcached内部协议解析(ascii_prot) 和 memcached内部协议解析(binary_prot) 中讲解的进行。但是数据存储于检索的过程详见 memcached缓存管理技术。
在该状态中,从socket中读取到的数据存储在c->ritem中,表示读取的是item的数据值。
该状态也是用来读取数据的,这里读取的数据存储在c->rbuf中,且当c->sbytes为零时,表示数据被成功的接收完毕了,此时memcached在下次进入drive_machine()时就会进入conn_new_cmd状态中。c->rbuf是用来接收命令的缓冲区,但这里用来接收(吞噬)不必存储的w/o数据。
这个状态主要用来向客户端回送一个应答。如果还没准备好的话,那么就会将c->wcurr中的c->wbytes个字节数据挂接到c->msglist[]中,等待条件允许的情况出现时,将其发送出去。该挂接工作在add_iov()函数中完成。msglist是一个长为c->msgsize个struct msghdr结构的缓冲区,每一个struct msghdr槽位存放一个消息数据,消息数据实际存放在struct msghdr结构中的msg_iov成员的iov_base中。其中msg_iov是一个struct iovec结构指针,其数量有struct msghdr结构中的msg_iovlen成员变量指示。欲了解struct iovec和struct msghdr结构及使用方法,请研读《UNIX 网络编程 第1卷:套接口API (第3版)》的P333~P341中相关的部分。
注意,源码中该状态的处理会through到下面的一个状态中。
这是一个数据发送的状态。在这个状态中,首先检查一下待发送的数据是否是UDP数据,若是的话,则调用build_udp_headers()函数构建每一个c->msglist[](简称消息)中的UDP头部。
build_udp_headers()函数用于构建UDP头部并且负责将其组合成待发送的消息。需要说明的是对c->msglist操作时启辅助作用的几个成员变量:
接下来就需要构建UDP头部了,细节这里不作讲述。但是有一点需要说明的是构建的每一个UDP头部都存在于每一个消息c->msglist[i]的第一个msg_iov槽位中。也就是在c->msglist[i].msg_iov[0].iov_base中。这里可以看出,使用struct msghdr结构作为数据发送与接收缓冲区是多么的灵活、方便。(当然,这里的发送与接收函数是sendmsg()与recvmgs())。另外,需要注意的是对于要发送的UDP数据报,每一个数据报槽位的第一个msg_iov必须先跳过,也是就是填充待发送数据时应从第二个msg_iov开始,第一个留作填充UDP头部。
- c->msgused
表示当前已经使用的c->msglist槽位的数量。也就是待发送的消息的数量。这里每个槽位的大小为sizeof(struct msghdr)- c->hdrsize
表示可用于构建UDP头部的槽位数量,这些槽位由c->hdrbuf指针变量指示。这里每个槽位的大小为UDP_HEADER_SIZE(=8B)。若是c->hdrsize小于c->msgused的话,说明当前的槽位数量不足,需要扩容。扩容时采用realloc()函数。
到了这里,不论是UDP数据报还是TCP数据包,都已经构建完毕,随后就是条用transmit()函数将它们发送出去。当然发送函数是sendmsg()。
transmit()函数由于从msglist缓冲区结构中发送下一个chuck数据。由于每一个struct msghdr结构中可以有struct msghdr.msg_iovlen个发送缓冲区,因此每成功调用一次sendmsg()函数就要逐个检查每一个发送缓冲区中的数据是否已经被成功的发送完毕。在检查时,要借助struct iovec结构中的iov_len成员(该成员用于指示成员iov_base缓冲区中有效数据的长度)。检查过程略。
待数据成功被发送出去后,若该socket是UDP,则下一步将使drive_machine()函数进入conn_read状态,以便继续读取数据。但若是TCP的话,那么下一步将会在conn_closing状态中关闭该套接口。
这里将会关闭一个套接口。关闭时需要将与该套接口相关联的资源释放掉。其中会涉及到一个操作item的函数item_remove()。因为c->item中的item(智能指针)是从slab中获得的,那么对于它的详细释放过程详见我的另一篇博文memcached缓存管理技术。