一个有目标,为自己的未来努力奋斗的人
分类: C/C++
2011-12-16 11:46:39
在前面一篇博文 Memcached软件源码级执行流程解读 中,以有过对多线程技术的介绍。在那里只是简单的介绍了其初始化,工作实体函数加载的一些过程,并没有对其中涉及到的细节部分作详细描述。在这篇博文中,我们将详解的介绍memcached中的多线程化技术,亲,不要小瞧这些东东,它会让你学到很多知识的^_^我只告诉你了,可别告诉别人俄:-)
写在前面:解读过程中使用的术语或变量名称,尽量和源码中的保持一致。这样,便于大家学习参考。若是某些名称和源码中的存在冲突,则以源码为准。该软件版本是:Memcached-1.4.5
软件启动后,经过一系列初始化流程后,最后会到初始化worker线程的函数中。该函数是thread_init(),其原型如下:
void thread_init(int nthreads, struct event_base *main_base);
该函数主要是用来初始化线程子系统的,它可以创建各种工作线程。其中,nthreads是指要产生的事件处理句柄工作线程的数量;main_base是指主线程的event base。
由于是多线程环境,必然的就需要在这里进行各种锁的初始化,其中有缓存锁cache_lock,状态信息统计锁stats_lock, 信号同步锁init_lock,条件变量init_cond,链接队列项目(item)锁cqi_freelist_lock等等。
接下来开始创建用于线程间通信的管道。其中一个用于读取,保存在threads[i].notify_receive_fd中;另外一个用于发送,保存在threads[i].notify_send_fd中。发送端存在于主线程中,其余的存在于副线程中。创建时通过for循环对nthreads个待创建线程一一创建。这些待创建的nthreads线程由一个全局变量threads(type是LIBEVENT_THREAD),进程通过它来管理各个线程。其中,threads[i].notify_receive_fd用于读取通知的内容,而threads[i].notify_send_fd用于发送通知内容。管道被成功创建之后,也就是pipe(fds)(其中,fds是int fds[2]类型的)成功返回后,就会调用setup_thread()函数,将该线程的处理函数安装在libevent上。该过程一次通过调用如下三个函数就可以完成:
void event_set(struct event *ev, evutil_socket_t fd, short events, void (*call_back)(evutil_socket_t, short, void *), void *arg);
int event_base_set(struct event_base *base, struct event *ev);
int event_add(struct event *ev, const struct timeval *tv);
其中,base需要通过event_init()函数得到,它将采用默认的配置创建一个struct event_base指针变量,你可以通过libevent中提供的配置修改相关函数(这里不作介绍)进行选项定制。通过event_init()函数得到的base指针变量中有一个成员,名为notify_event,可以直接用于上面三个函数中的ev参数。
在这里不得不提一下,源码中此处的call_back函数指针变量所指向的是thread_libevent_process()函数。也就意味着当上面创建的管道描述符.notify_receive_fd上有数据可读时,那么libevent将触发thread_libevent_process()函数调用。需要注意的是,当这种情况发生时,那么它所处的执行环境上下文是在一个线程(线程threads[i])中的。
把线程处理句柄挂载到libevent上后,接下来就需要为该线程后续过程的数据处理申请链接队列处理空间并创建后缀缓存。其中,链接处理队列是由LIBEVENT_THREAD结构中的new_conn_queue成员,也就是源码中me->new_conn_queue来管理的。而后需要通过cq_init()进行初始化。通过该初始化函数可以知道,该队列使用于线程间共享的数据结构,对它的操作需要通过mutex和cond进行线程间同步和条件挂起与唤醒。该线程的后缀缓存,是通过cache_create()函数来创建的,对cache相关操作函数的介绍,详见 memcached缓存管理技术 一文。
待线程处理函数在libevent安装完毕后,就可以通过调用create_worker()函数来创建线程worker_libevent了。这里也是通过for循环对threads个线程一一进行启动的。create_worker()函数很简单,在其中仅仅调用了pthread_create()函数。worker_libevent实质上是一个函数,具体来讲它将是线程的执行实体。在该线程中,它将调用event_base_loop()(libevent库中的)函数来监控.notify_receive_fd管道描述符的数据读取事件,必要的时候就会调用thread_libevent_process()函数。
最后,通过init_cond线程条件变量来迫使主线程等待所有的线程都已被成功创建。而后线程处理句柄的安装和线程的启动就完成了。那么,到了这里函数thread_init()函数也就返回了。
接下来是创建primary_hashtable的维护线程。该工作是在函数start_assoc_maintenance_thread()中进行的。维护线程的执行实体是assoc_maintenance_thread。在assoc_maintenance_thread()函数中主要是通过线程间共享变量expanding来控制的,当expanding为真并且old_hashtable[expand_bucket]位置处的item需要维护(修理:也就是需要重新进行hash计算)时,将会启动维护操作。维护线程的处理算法详解见下文。
到了这里,接下来就是注册memcached系统时钟处理函数,详解见我的另一片博文:memcached软件源码级执行流程详解。需要注意的是,不论是unix套接口还是tcp套接口,对监听端口的监听工作是在主线程中进行的。
在TCP数据的处理过程中,有一个conn_listening状态,该状态的处理是在主线程中进行的,而后的其他状态 均是在工作线程中进行的。在这个状态中,主线程需要完成如下的工作:
这个状态是连接刚刚开始时,后续数据处理的起始状态。在该状态中,需要调用accept()函数从已连接队列中取出一个连接,并返回套接口描述符sfd. accepte()函数成功返回后,紧接着就是设置sfd为非阻塞套接口(fcntl(sfd, F_SETFL, O_NONBLOCK))。此刻以后对该套接口的操作将都是非阻塞的。注意对阻塞的套接口的(I/O)操作并非适用于非阻塞套接口,这里会涉及到很多技术方法和细节(下文中有讲解),所以若是在软件设计中也使用了非阻塞文件描述符时,也要特别注意这些,希望这里的讲解对以后您的工作有帮助。那么,接下来需要把刚刚接受到的新连接递交给工作线程,这里会涉及到线程间的通信(利用管道方式),也是线程之间协同进行工作的关键,完成此项工作的函数就是dispatch_comm_new()。
其原型如下:
void dispatch_conn_new(int sfd, enum conn_states init_state, int event_flags,
int read_buffer_size, enum network_transport transport);
该函数主要是用来派遣一个新的连接到其他的线程。线程的选择算法采用轮询的方式进行。然后,通过cqi_new()函数从全局空闲连接管理队列中取出一个item(TYPE is CQ_ITEM *),通过cq_push()函数将该item添加到选定线程的new_conn_queue上。
在函数cq_push()中,由于操作的new_conn_queue是多个线程(主线程和该线程)共享的数据结构,因此对它的操作需要mutex锁的保护。最后,更重要的是该线程可能由于没有可供运转的连接资源而处于挂起状态,那么这里就需要通过pthread_cond_signal(&cq->cond)函数将工作线程唤醒。
由于待唤醒的工作线程马上要接替主线程完成对新连接后续的处理,那么在真正的通过管道将新连接通知给工作线程之前,还需要在dispatch_conn_new()函数做一点点的初始化工作:首先需要将标识新连接的套接口描述符sfd赋值到item->sfd, 连接在memcached内部所处的状态conn_new_cmd设置到item->init_state中以及在对该新连接处理时需要让libevent关心的事件标志设置到item->event_flags中;其次,设置item->read_buffer_size为DATA_BUFFER_SIZE(=2048);最后,设置该套接口的类型为tcp_transport (这里可设置的类型可以有如下三种:local_transport(Unix sockets),tcp_transport,udp_transport)。完成以上初始化工作后,就可以调用write()函数向管道的写端thread->notify_send_fd上发送一个字节的数据了(注意这里没有什么原因说是为什么会是这样,这只是memcached内部采取的一个技术措施而已,当然你也可以采用其他的可行方法)。当线程工作线程读到写端发送来的一个字节后,就知道有新的连接已经就绪,可以开始工作了,其工作工程见我的另一篇博文 memcached内部数据处理过程解析。
这里还需要说明的是,当accpet()出错时,我们需要对其进行不同的错误处理。由于监听的套接口是非阻塞的,所以它有可能会返回EAGAIN或是EWOULDBLOCK错误(区别于不同的系统),那么这时应该立即结束本次处理,而将控制返还到libevent中。但错误若是EMFILE,说明此时系统中建立的连接数量太多而无法继续建立连接了,那么就需要通过accept_new_conns()->do_accept_new_conns()调用listen()函数,将backlog设置成零(注意,在已经处于监听状态的套接口上再次调用listen()函数,可以修改其配置信息,如backlog),借此来通知系统不再继续接收新的链接了。同时也会立即结束本次处理,而将控制返还到libevent中。但返回了其他的错误,那么此时就是真的出错了,带打印一条出错信息后也立即结束本次处理,将控制返还到libevent中。
对于UDP监听套接口的处理,在主线程的server_socket()函数中调用了dispatch_conn_new()。该函数在上文对TCP数据处理过程的讲解中已有介绍,主要作用来派遣一个新的连接到其他的线程。线程的选择算法采用轮询的方式进行。但是,这里调用dispatch_conn_new()函数时,输入的参数是conn_read,也就是它要告诉工作线程:当你们开始工作时,需要从conn_read开始。从源码中可以看出,对UDP的处理,对监听套接口的监听工作是由所有工作线程进行的。主线程将待监听的套接口描述分发给了所有工作线程,当有数据到来时,那么工作线程就需要负责数据的处理工作了。具体的工作流程详见我的另一篇博文 memcached内部数据处理过程解析。
OK!到了这里,memcached的多线程技术就讲解完了。不知道,亲,你领悟到了没?由于技术的复杂性,这里不便于采用图示的方式直观地将其展示出来,但我后期会争取做到的^_^
欲知多线程环境下,如何利用libevent函数库开发高性能、高可移植性的网络应用程序,那么就认真的研读一下本文吧^_^
祝好!