Chinaunix首页 | 论坛 | 博客
  • 博客访问: 5783501
  • 博文数量: 291
  • 博客积分: 0
  • 博客等级: 民兵
  • 技术积分: 7924
  • 用 户 组: 普通用户
  • 注册时间: 2016-07-06 14:28
个人简介

阿里巴巴是个快乐的青年

文章分类

全部博文(291)

文章存档

2018年(21)

2017年(4)

2016年(5)

2015年(17)

2014年(68)

2013年(174)

2012年(2)

分类: NOSQL

2013-04-03 18:21:56

一、网络模型
        Memcache网络底层采用的Libevent,其网络模型就是大名鼎鼎的半同步半异步。关于Libevent可以参看我之前写的一篇拙文《Libevent源码解析--事件处理框架》,半同步半异步可以看。
        在阅读本文前,建议先阅读《Libevent源码解析--事件处理框架》,这样很多概念便于理解,很多组件或术语在这儿不再赘述。

二、多线程模型
1、关键结构
        老规矩,先介绍下关键的数据结构:
        (1)CQ_ITEM
        CQ_ITEM定义在Thread.c/21行,具体如下:
        typedef struct conn_queue_item CQ_ITEM;
        struct conn_queue_item {
                int               sfd;
                enum conn_states  init_state;
                int               event_flags;
                int               read_buffer_size;
                enum network_transport     transport;
                CQ_ITEM          *next;
        };
        可以将这个结构体看着是主线程accept触发时即有客户端连入时,主线程写入工作线程有关socket连接相关句柄数据结构,绑定了socket描述符、状态、发生的事件、读buffer大小等,看着解释应该不难对号入座吧
        (2)CQ
        conn_queue定义在Thread.c/33行,具体如下:

        typedef struct conn_queue CQ;
        struct conn_queue {
            CQ_ITEM *head;
            CQ_ITEM *tail;
            pthread_mutex_t lock;
            pthread_cond_t  cond;
        };
        这是socket连接通知队列,这句都看不明白的打PP
        (3)LIBEVENT_THREAD
        LIBEVENT_THREAD定义在Memcached.h/351行,具体如下:
        typedef struct {
                pthread_t thread_id;        /* unique ID of this thread */
                struct event_base *base;    /* libevent handle this thread uses */
                struct event notify_event;  /* listen event for notify pipe */
                int notify_receive_fd;      /* receiving end of notify pipe */
                int notify_send_fd;         /* sending end of notify pipe */
                struct thread_stats stats;  /* Stats generated by this thread */
                struct conn_queue *new_conn_queue; /* queue of new connections to handle */
                cache_t *suffix_cache;      /* suffix cache */
                uint8_t item_lock_type;     /* use fine-grained or global item lock */
        } LIBEVENT_THREAD;
        可以将这个结构体看着是线程句柄数据结构,绑定了线程ID、Libevent实例、用于通知管道的event、通知接收的socket描述符,通知发送的socket描述符、socketl连接通知队列,这也很好对号入座吧
        (4)conn
        conn定义在Memcached.h/371行,具体如下:
        typedef struct conn conn;
        struct conn {
            int    sfd;
            ...
            struct event event;
            short  ev_flags;
            short  which;   /** which events were just triggered */
            ...
            LIBEVENT_THREAD *thread; /* Pointer to the thread object serving this connection */
        };
        这个结构非常庞大,提取关键的几个字段来分析一下,
 可以将这个结构体看着是socket连接句柄数据结构,绑定了socket描述符、触发的事件、处理连接的线程指针等,这个也很好对号入座吧
2、整体流程       
       整体流程如下图:

图1 整体流程
        (1)在main函数中调用main_base = event_init()来初始化主线程Libevent实例(Memcached.c/5125)。
        (2)在main函数中调用thread_init(Thread.c/772)来初始化工作线程(Memcached.c/5142),并将主线程Libevent实例作为参数传入。
 
        (3)在thread_init函数中为指定数量的工作线程分配内存(Thread.c/810行),为每个线程创建管道(Thread.c/821行),并分别绑定到通知收(Thread.c/826行)和发(Thread.c/827行的socket描述符上,调用函数setup_thread初始化线程信息(Thread.c/328行),调用函数create_worker为每个线程注册回调函数(Thread.c/301行)。关键代码秀一下:
        for (i = 0; i < nthreads; i++) {
                int fds[2];
                if (pipe(fds)) {
                        ...
                }

                threads[i].notify_receive_fd = fds[0];
                threads[i].notify_send_fd = fds[1];

               setup_thread(&threads[i]);
               ...
        }

        /* Create threads after we've done all the libevent setup. */
        for (i = 0; i < nthreads; i++) {
            create_worker(worker_libevent, &threads[i]);
        }
        (4)在setup_thread函数中,为工作线程初始化Libevent实例(Thread.c/329行),为主线程通知读(notify_receive_fd)注册回调函数thread_libevent_process(Thread.c/336行),初始化cq队列(Thread.c/350行),关键代码如下:
        static void setup_thread(LIBEVENT_THREAD *me) {
                me->base = event_init();
                ...
                /* Listen for notifications from other threads */
                event_set(&me->notify_event, me->notify_receive_fd,
                          EV_READ | EV_PERSIST, thread_libevent_process, me);
                event_base_set(me->base, &me->notify_event);

                if (event_add(&me->notify_event, 0) == -1) {
                        ...
                }

                me->new_conn_queue = malloc(sizeof(struct conn_queue));
                ...
                cq_init(me->new_conn_queue);
                ...
        }
        (5)在thread_libevent_process函数中,读取主线程发送的通知接收消息(Thread.c/398行),将主线程accept来的fd注册到工作线程的Libevent实例中(Thread.c/407行),主线程accept来的fd从conn_queue队列获取(Thread.c/404行),关键代码如下:
        static void thread_libevent_process(int fd, short which, void *arg) {
                LIBEVENT_THREAD *me = arg;
                CQ_ITEM *item;
                char buf[1];

                if (read(fd, buf, 1) != 1)
                        ...

                switch (buf[0]) {
                case 'c':
                item = cq_pop(me->new_conn_queue);

                if (NULL != item) {
                        conn *c = conn_new(item->sfd, item->init_state, item->event_flags,
                                                        item->read_buffer_size, item->transport, me->base);
               ...
                }
        }
        (6)在函数conn_new中,创建conn句柄(Memcached.c/358行),为句柄注册回调函数event_handler处理事件(Memcached.c/452行),将该句柄作为参数传入回调函数并设置到Libevent中(Memcached.c/453行),该函数的关键代码如下:
        conn *conn_new(const int sfd, enum conn_states init_state,
                        const int event_flags,
                        const int read_buffer_size, enum network_transport transport,
                        struct event_base *base) {
            conn *c = conn_from_freelist();

            if (NULL == c) {
                if (!(c = (conn *)calloc(1, sizeof(conn)))) {
                        ...
            }
           ...
           event_set(&c->event, sfd, event_flags, event_handler, (void *)c);
           event_base_set(base, &c->event);
           c->ev_flags = event_flags;

           if (event_add(&c->event, 0) == -1) {
                  ...
           }
           ...
        }
        (7)在create_worker函数中,创建工作线程并注册回调函数(Thread.c/308行),在工作线程的回调函数work_libevent中,开始Libevent主循环(Thread.c/384行)。
        (8)在main函数中,调用函数server_sockets(Memcached.c/5199行),再调用函数server_socket(Memcached.c/4299行),进而调用函数new_socket(Memcached.c/4164行),在调用函数conn_new(Memcached.c/4252行),创建并注册listen fd到主线程Libevent实例上,最后开始Libevent主循环即event_base_loop(Memcached.c/5228行)。conn_new函数关键代码见步骤(6)
        (9)在event_handler函数中,调用函数drive_machine(Memcached.c/4065行),在该函数中处理所有事件,其关键代码如下:
        static void drive_machine(conn *c) {
                ...

                while (!stop) {

                    switch(c->state) {
                        case conn_listening:
                                addrlen = sizeof(addr);
                                if ((sfd = accept(c->sfd, (struct sockaddr *)&addr, &addrlen)) == -1) {
                                       ...                                      
                                }
                               ...
                               if (settings.maxconns_fast &&
                                    stats.curr_conns + stats.reserved_fds >= settings.maxconns - 1) {
                                        ...
                               } else {
                                    dispatch_conn_new(sfd, conn_new_cmd, EV_READ | EV_PERSIST,
                                             DATA_BUFFER_SIZE, tcp_transport);
                              }

                            stop = true;
                            break;
                            ...
                        }
                    }

                return;
        }
        在处理事件时,如果是listening事件,则调用函数dispatch_conn_newMemcached.c/3785行)将accept fd分配给工作线程。
        (10)在dispatch_conn_new函数中,根据round-robin算法(Thread.c/450行)将新连接push到所分配线程(Thread.c/452行)的CQ队列中(Thread.c/462行),并通过管道发送通知消息“c”(Thread.c/466行),关键代码如下:
        void dispatch_conn_new(int sfd, enum conn_states init_state, int event_flags,
                               int read_buffer_size, enum network_transport transport) {
                    CQ_ITEM *item = cqi_new();
                    char buf[1];
                    int tid = (last_thread + 1) % settings.num_threads;

                    LIBEVENT_THREAD *thread = threads + tid;

                    last_thread = tid;

                    ...

                    cq_push(thread->new_conn_queue, item);
                    ...
                    buf[0] = 'c';
                    if (write(thread->notify_send_fd, buf, 1) != 1) {
                        perror("Writing to thread notify pipe");
                    }
        }
        dispatch_conn_new函数只在主线程中调用,last_thread为静态变量,每次将该变量值+1,再模线程数来选择工作线程。
3、线程模型
        Libevent本身是单线程的,Memcached采用消息通知+同步层机制使得其支持多线程,整体模型见如下神图


图2 线程模型
        上图是流传已经的神图,先拿来主义,源码见前面剖析部分,不再重复,图中也描述得很详细,归纳起来就如下:
        每个线程包括主线程都各自有独立的Libevent实例,Memcached的listen fd注册到主线程的Libevent实例上,由主线程来accept新的连接,接受新的连接后根据Round-robin算法选择工作线程,将新连接的socket fd封装为CQ_ITEM后push到所选工作线程的CQ队列中,然后主线程(notify_send_fd)通过管道发送字符“c”到工作线程(notify_receive_fd),而notify_receive_fd已经注册到工作线程的Libevent实例上了,这样工作线程就能收到通知“c”,然后从该工作线程的CQ队列中pop出CQ_ITEM进而取出新连接并将fd注册到工作线程的Libevent实例上,从而由工作线程来处理该连接的所有后续事件。

        需要注意的数据:Memcached默认开启线程数为4(Memcached.c/216行),也可以通过参数-t来指定开启线程数(Memcached.c/4865行),当线程数大于64时会给出错误提示(Memcached.c/4874行),建议线程数为小于或等于CPU核数。


阅读(12546) | 评论(3) | 转发(5) |
给主人留下些什么吧!~~

scq2099yt2013-04-07 13:12:12

cwlalx:Memcached可以算是libevent多线程下编程的经典实例了,是值得学习的。呵呵,前几天也刚刚看完memcached的这部分源代码,它的内存池存储技术还是比较突出的。

呵呵,拿Libevent写服务的时候可以参考Memcached

回复 | 举报

cwlalx2013-04-07 10:46:56

Memcached可以算是libevent多线程下编程的经典实例了,是值得学习的。呵呵,前几天也刚刚看完memcached的这部分源代码,它的内存池存储技术还是比较突出的。

scq2099yt2013-04-04 17:15:06

文明上网,理性发言...