Chinaunix首页 | 论坛 | 博客
  • 博客访问: 28316
  • 博文数量: 8
  • 博客积分: 210
  • 博客等级: 入伍新兵
  • 技术积分: 105
  • 用 户 组: 普通用户
  • 注册时间: 2011-07-23 12:46
个人简介

ψ(° Д °;)ψ

文章分类

全部博文(8)

文章存档

2013年(2)

2012年(2)

2011年(4)

我的朋友

分类: NOSQL

2013-05-24 17:39:06

 

1. 创建工作线程

thread_init(settings.num_threads, main_base);

   1: /*
   2:  * Initializes the thread subsystem, creating various worker threads.
   3:  *
   4:  * nthreads  Number of worker event handler threads to spawn
   5:  * main_base Event base for main thread
   6:  */
   7: void thread_init(int nthreads, struct event_base *main_base) {
   8:     int         i;
   9:     int         power;
  10:  
  11:     pthread_mutex_init(&cache_lock, NULL);
  12:     pthread_mutex_init(&stats_lock, NULL);
  13:  
  14:     pthread_mutex_init(&init_lock, NULL);
  15:     pthread_cond_init(&init_cond, NULL);
  16:  
  17:     pthread_mutex_init(&cqi_freelist_lock, NULL);
  18:     cqi_freelist = NULL;
  19:  
  20:     /* Want a wide lock table, but don't waste memory */
  21:     if (nthreads < 3) {
  22:         power = 10;
  23:     } else if (nthreads < 4) {
  24:         power = 11;
  25:     } else if (nthreads < 5) {
  26:         power = 12;
  27:     } else {
  28:         /* 8192 buckets, and central locks don't scale much past 5 threads */
  29:         power = 13;
  30:     }
  31:  
  32:     item_lock_count = hashsize(power);
  33:  
  34:     item_locks = calloc(item_lock_count, sizeof(pthread_mutex_t));
  35:     if (! item_locks) {
  36:         perror("Can't allocate item locks");
  37:         exit(1);
  38:     }
  39:     for (i = 0; i < item_lock_count; i++) {
  40:         pthread_mutex_init(&item_locks[i], NULL);
  41:     }
  42:     pthread_key_create(&item_lock_type_key, NULL);
  43:     pthread_mutex_init(&item_global_lock, NULL);
  44:  
  45:     /*
  46:      * 以下为设置nthreads个工作线程
  47:      */
  48:     threads = calloc(nthreads, sizeof(LIBEVENT_THREAD));
  49:     if (! threads) {
  50:         perror("Can't allocate thread descriptors");
  51:         exit(1);
  52:     }
  53:  
  54:     dispatcher_thread.base = main_base;
  55:     dispatcher_thread.thread_id = pthread_self();
  56:  
  57:     for (i = 0; i < nthreads; i++) {
  58:         int fds[2];
  59:         if (pipe(fds)) {
  60:             perror("Can't create notify pipe");
  61:             exit(1);
  62:         }
  63:  
  64:         threads[i].notify_receive_fd = fds[0];
  65:         threads[i].notify_send_fd = fds[1];
  66:  
  67:         /* 设置每个工作线程参数 */
  68:         setup_thread(&threads[i]);
  69:         /* Reserve three fds for the libevent base, and two for the pipe */
  70:         stats.reserved_fds += 5;
  71:     }
  72:  
  73:     /* Create threads after we've done all the libevent setup.
  74:      * 创建nthreads个工作线程
  75:      */
  76:     for (i = 0; i < nthreads; i++) {
  77:         create_worker(worker_libevent, &threads[i]);
  78:     }
  79:  
  80:     /* Wait for all the threads to set themselves up before returning.
  81:      * 阻塞等待所有worker_libevent线程创建完成
  82:      */
  83:     pthread_mutex_lock(&init_lock);
  84:     wait_for_thread_registration(nthreads);
  85:     pthread_mutex_unlock(&init_lock);
  86: }

该函数主要用于创建设置工作线程。

其中LIBEVENT_THREAD为线程句柄,注释一目了然。结构如下:

   1: typedef struct {
   2:     pthread_t thread_id;        /* unique ID of this thread */
   3:     struct event_base *base;    /* libevent handle this thread uses libenent句柄 */
   4:     struct event notify_event;  /* listen event for notify pipe 管道事件 */
   5:     int notify_receive_fd;      /* receiving end of notify pipe \ _ 通知管道 */
   6:     int notify_send_fd;         /* sending end of notify pipe   /            */
   7:     struct thread_stats stats;  /* Stats generated by this thread */
   8:     struct conn_queue *new_conn_queue; /* queue of new connections to handle 连接队列 */
   9:     cache_t *suffix_cache;      /* suffix cache */
  10:     uint8_t item_lock_type;     /* use fine-grained or global item lock */
  11: } LIBEVENT_THREAD;


开始为每个工作线程初始化句柄,包括创建pipe,以及在setup_thread中设置事件等。

   1: for (i = 0; i < nthreads; i++) {
   2:     int fds[2];
   3:     if (pipe(fds)) {
   4:         perror("Can't create notify pipe");
   5:         exit(1);
   6:     }
   7:  
   8:     threads[i].notify_receive_fd = fds[0];
   9:     threads[i].notify_send_fd = fds[1];
  10:  
  11:     /* 设置每个工作线程参数 */
  12:     setup_thread(&threads[i]);
  13:     /* Reserve three fds for the libevent base, and two for the pipe */
  14:     stats.reserved_fds += 5;
  15: }
  16:  
  17: ......
  18:  
  19: /*
  20:  * Set up a thread's information.
  21:  */
  22: static void setup_thread(LIBEVENT_THREAD *me) {
  23:     me->base = event_init();
  24:     if (! me->base) {
  25:         fprintf(stderr, "Can't allocate event base\n");
  26:         exit(1);
  27:     }
  28:  
  29:     /* Listen for notifications from other threads
  30:      * 监听管道事件
  31:      */
  32:     event_set(&me->notify_event, me->notify_receive_fd,
  33:               EV_READ | EV_PERSIST, thread_libevent_process, me);
  34:     event_base_set(me->base, &me->notify_event);
  35:  
  36:     if (event_add(&me->notify_event, 0) == -1) {
  37:         fprintf(stderr, "Can't monitor libevent notify pipe\n");
  38:         exit(1);
  39:     }
  40:  
  41:     /* 初始化一个空连接队列CQ */
  42:     me->new_conn_queue = malloc(sizeof(struct conn_queue));
  43:     if (me->new_conn_queue == NULL) {
  44:         perror("Failed to allocate memory for connection queue");
  45:         exit(EXIT_FAILURE);
  46:     }
  47:     cq_init(me->new_conn_queue);
  48:  
  49:     if (pthread_mutex_init(&me->stats.mutex, NULL) != 0) {
  50:         perror("Failed to initialize mutex");
  51:         exit(EXIT_FAILURE);
  52:     }
  53:  
  54:     me->suffix_cache = cache_create("suffix", SUFFIX_SIZE, sizeof(char*),
  55:                                     NULL, NULL);
  56:     if (me->suffix_cache == NULL) {
  57:         fprintf(stderr, "Failed to create suffix cache\n");
  58:         exit(EXIT_FAILURE);
  59:     }
  60: }

setup_thread()主要创建一个event事件,监听之前pipe出来的读管道,并设置触发回调thread_libevent_process。

接着初始化连接队列CQ,CQ是个双链表,结构如下:

   1: /* An item in the connection queue. */
   2: typedef struct conn_queue_item CQ_ITEM;
   3: struct conn_queue_item {
   4:     int               sfd;
   5:     enum conn_states  init_state;
   6:     int               event_flags;            /* libevent标志 */
   7:     int               read_buffer_size;        /* DATA_BUFFER_SIZE=2048 */
   8:     enum network_transport     transport;    /* sock协议 */
   9:     CQ_ITEM          *next;
  10: };
  11:  
  12: /* A connection queue. */
  13: /* sock连接队列 */
  14: typedef struct conn_queue CQ;
  15: struct conn_queue {
  16:     CQ_ITEM *head;
  17:     CQ_ITEM *tail;
  18:     pthread_mutex_t lock;
  19:     pthread_cond_t  cond;
  20: };

设置完工作线程句柄后,就是创建它们了
   1: /* Create threads after we've done all the libevent setup.
   2:  * 创建nthreads个工作线程
   3:  */
   4: for (i = 0; i < nthreads; i++) {
   5:     create_worker(worker_libevent, &threads[i]);
   6: }
   7:  
   8: ......
   9:  
  10: /*
  11:  * Worker thread: main event loop
  12:  */
  13: static void *worker_libevent(void *arg) {
  14:     LIBEVENT_THREAD *me = arg;
  15:  
  16:     /* Any per-thread setup can happen here; thread_init() will block until
  17:      * all threads have finished initializing.
  18:      */
  19:  
  20:     /* set an indexable thread-specific memory item for the lock type.
  21:      * this could be unnecessary if we pass the conn *c struct through
  22:      * all item_lock calls...
  23:      */
  24:     me->item_lock_type = ITEM_LOCK_GRANULAR;
  25:     pthread_setspecific(item_lock_type_key, &me->item_lock_type);
  26:  
  27:     /* 线程注册,通知已成功创建 */
  28:     register_thread_initialized();
  29:  
  30:     /* libevent开始等待事件 */
  31:     event_base_loop(me->base, 0);
  32:     return NULL;
  33: }

传递的是之前的LIBEVENT_THREAD。register_thread_initialized是用于通知子线程自己已就绪,主线程用wait_for_thread_registration来等待所有子线程创建完成。工作线程在最后event_base_loop阻塞监听事件了。

2. 网络监听

完成前面的工作之后,就是网络监听部分了。

   1: /* create the listening socket, bind it, and init */
   2: if (settings.socketpath == NULL) {
   3:     const char *portnumber_filename = getenv("MEMCACHED_PORT_FILENAME");
   4:     char temp_portnumber_filename[PATH_MAX];
   5:     FILE *portnumber_file = NULL;
   6:  
   7:     if (portnumber_filename != NULL) {
   8:         snprintf(temp_portnumber_filename,
   9:                  sizeof(temp_portnumber_filename),
  10:                  "%s.lck", portnumber_filename);
  11:  
  12:         portnumber_file = fopen(temp_portnumber_filename, "a");
  13:         if (portnumber_file == NULL) {
  14:             fprintf(stderr, "Failed to open \"%s\": %s\n",
  15:                     temp_portnumber_filename, strerror(errno));
  16:         }
  17:     }
  18:  
  19:     errno = 0;
  20:     /* 1) 建立TCP连接 */
  21:     if (settings.port && server_sockets(settings.port, tcp_transport,
  22:                                        portnumber_file)) {
  23:         vperror("failed to listen on TCP port %d", settings.port);
  24:         exit(EX_OSERR);
  25:     }
  26:  
  27:     /*
  28:      * initialization order: first create the listening sockets
  29:      * (may need root on low ports), then drop root if needed,
  30:      * then daemonise if needed, then init libevent (in some cases
  31:      * descriptors created by libevent wouldn't survive forking).
  32:      */
  33:  
  34:     /* create the UDP listening socket and bind it */
  35:     /* 2) 创建UDP连接 */
  36:     errno = 0;
  37:     if (settings.udpport && server_sockets(settings.udpport, udp_transport,
  38:                                           portnumber_file)) {
  39:         vperror("failed to listen on UDP port %d", settings.udpport);
  40:         exit(EX_OSERR);
  41:     }
  42:  
  43:     if (portnumber_file) {
  44:         fclose(portnumber_file);
  45:         rename(temp_portnumber_filename, portnumber_filename);
  46:     }
  47: }

下面我们只分析TCP,UDP类同。
   1: /**
   2:  * Create a socket and bind it to a specific port number
   3:  * @param interface the interface to bind to
   4:  * @param port the port number to bind to
   5:  * @param transport the transport protocol (TCP / UDP)
   6:  * @param portnumber_file A filepointer to write the port numbers to
   7:  *        when they are successfully added to the list of ports we
   8:  *        listen on.
   9:  */
  10: static int server_socket(const char *interface,
  11:                          int port,
  12:                          enum network_transport transport,
  13:                          FILE *portnumber_file) {
  14:     int sfd;
  15:     struct linger ling = {0, 0};
  16:     struct addrinfo *ai;
  17:     struct addrinfo *next;
  18:     struct addrinfo hints = { .ai_flags = AI_PASSIVE,
  19:                               .ai_family = AF_UNSPEC };
  20:     char port_buf[NI_MAXSERV];
  21:     int error;
  22:     int success = 0;
  23:     int flags =1;
  24:  
  25:     hints.ai_socktype = IS_UDP(transport) ? SOCK_DGRAM : SOCK_STREAM;
  26:  
  27:     if (port == -1) {
  28:         port = 0;
  29:     }
  30:     snprintf(port_buf, sizeof(port_buf), "%d", port);
  31:     error= getaddrinfo(interface, port_buf, &hints, &ai);
  32:     if (error != 0) {
  33:         if (error != EAI_SYSTEM)
  34:           fprintf(stderr, "getaddrinfo(): %s\n", gai_strerror(error));
  35:         else
  36:           perror("getaddrinfo()");
  37:         return 1;
  38:     }
  39:  
  40:     for (next= ai; next; next= next->ai_next) {
  41:         conn *listen_conn_add;
  42:         /* 创建socket并设置非阻塞 */
  43:         if ((sfd = new_socket(next)) == -1) {
  44:             /* getaddrinfo can return "junk" addresses,
  45:              * we make sure at least one works before erroring.
  46:              * 保证至少一个可用
  47:              */
  48:             if (errno == EMFILE) {
  49:                 /* ...unless we're out of fds */
  50:                 perror("server_socket");
  51:                 exit(EX_OSERR);
  52:             }
  53:             continue;
  54:         }
  55:  
  56: #ifdef IPV6_V6ONLY
  57:         if (next->ai_family == AF_INET6) {
  58:             error = setsockopt(sfd, IPPROTO_IPV6, IPV6_V6ONLY, (char *) &flags, sizeof(flags));
  59:             if (error != 0) {
  60:                 perror("setsockopt");
  61:                 close(sfd);
  62:                 continue;
  63:             }
  64:         }
  65: #endif
  66:  
  67:         setsockopt(sfd, SOL_SOCKET, SO_REUSEADDR, (void *)&flags, sizeof(flags));
  68:         if (IS_UDP(transport)) {
  69:             maximize_sndbuf(sfd);
  70:         } else {
  71:             error = setsockopt(sfd, SOL_SOCKET, SO_KEEPALIVE, (void *)&flags, sizeof(flags));
  72:             if (error != 0)
  73:                 perror("setsockopt");
  74:  
  75:             error = setsockopt(sfd, SOL_SOCKET, SO_LINGER, (void *)&ling, sizeof(ling));
  76:             if (error != 0)
  77:                 perror("setsockopt");
  78:  
  79:             error = setsockopt(sfd, IPPROTO_TCP, TCP_NODELAY, (void *)&flags, sizeof(flags));
  80:             if (error != 0)
  81:                 perror("setsockopt");
  82:         }
  83:  
  84:         if (bind(sfd, next->ai_addr, next->ai_addrlen) == -1) {
  85:             if (errno != EADDRINUSE) {
  86:                 perror("bind()");
  87:                 close(sfd);
  88:                 freeaddrinfo(ai);
  89:                 return 1;
  90:             }
  91:             close(sfd);
  92:             continue;
  93:         } else {
  94:             success++;
  95:             if (!IS_UDP(transport) && listen(sfd, settings.backlog) == -1) {
  96:                 perror("listen()");
  97:                 close(sfd);
  98:                 freeaddrinfo(ai);
  99:                 return 1;
 100:             }
 101:             if (portnumber_file != NULL &&
 102:                 (next->ai_addr->sa_family == AF_INET ||
 103:                  next->ai_addr->sa_family == AF_INET6)) {
 104:                 union {
 105:                     struct sockaddr_in in;
 106:                     struct sockaddr_in6 in6;
 107:                 } my_sockaddr;
 108:                 socklen_t len = sizeof(my_sockaddr);
 109:                 if (getsockname(sfd, (struct sockaddr*)&my_sockaddr, &len)==0) {
 110:                     if (next->ai_addr->sa_family == AF_INET) {
 111:                         fprintf(portnumber_file, "%s INET: %u\n",
 112:                                 IS_UDP(transport) ? "UDP" : "TCP",
 113:                                 ntohs(my_sockaddr.in.sin_port));
 114:                     } else {
 115:                         fprintf(portnumber_file, "%s INET6: %u\n",
 116:                                 IS_UDP(transport) ? "UDP" : "TCP",
 117:                                 ntohs(my_sockaddr.in6.sin6_port));
 118:                     }
 119:                 }
 120:             }
 121:         }
 122:  
 123:         if (IS_UDP(transport)) {    /* UPD连接 */
 124:             int c;
 125:  
 126:             for (c = 0; c < settings.num_threads_per_udp; c++) {
 127:                 /* this is guaranteed to hit all threads because we round-robin */
 128:                 dispatch_conn_new(sfd, conn_read, EV_READ | EV_PERSIST,
 129:                                   UDP_READ_BUFFER_SIZE, transport);
 130:             }
 131:         } else {    /* TCP连接 */
 132:             /* 将sfd添加到连接队列,并设置事件 */
 133:             if (!(listen_conn_add = conn_new(sfd, conn_listening,
 134:                                              EV_READ | EV_PERSIST, 1,
 135:                                              transport, main_base))) {
 136:                 fprintf(stderr, "failed to create listening connection\n");
 137:                 exit(EXIT_FAILURE);
 138:             }
 139:             /* 添加到TCP监听队列 */
 140:             listen_conn_add->next = listen_conn;
 141:             listen_conn = listen_conn_add;
 142:         }
 143:     }
 144:  
 145:     freeaddrinfo(ai);
 146:  
 147:     /* Return zero iff we detected no errors in starting up connections */
 148:     return success == 0;
 149: }

首先解析每一个主机名,用new_socket()创建了阻塞方式的sfd,bind。
接着是conn_new()。

   1: conn *conn_new(const int sfd, enum conn_states init_state,
   2:                 const int event_flags,
   3:                 const int read_buffer_size, enum network_transport transport,
   4:                 struct event_base *base) {
   5:     conn *c = conn_from_freelist();    /* 取空闲connection */
   6:  
   7:     if (NULL == c) {
   8:         if (!(c = (conn *)calloc(1, sizeof(conn)))) {
   9:             fprintf(stderr, "calloc()\n");
  10:             return NULL;
  11:         }
  12:         MEMCACHED_CONN_CREATE(c);
  13:  
  14:         c->rbuf = c->wbuf = 0;
  15:         c->ilist = 0;
  16:         c->suffixlist = 0;
  17:         c->iov = 0;
  18:         c->msglist = 0;
  19:         c->hdrbuf = 0;
  20:  
  21:         c->rsize = read_buffer_size;
  22:         c->wsize = DATA_BUFFER_SIZE;
  23:         c->isize = ITEM_LIST_INITIAL;
  24:         c->suffixsize = SUFFIX_LIST_INITIAL;
  25:         c->iovsize = IOV_LIST_INITIAL;
  26:         c->msgsize = MSG_LIST_INITIAL;
  27:         c->hdrsize = 0;
  28:  
  29:         c->rbuf = (char *)malloc((size_t)c->rsize);
  30:         c->wbuf = (char *)malloc((size_t)c->wsize);
  31:         c->ilist = (item **)malloc(sizeof(item *) * c->isize);
  32:         c->suffixlist = (char **)malloc(sizeof(char *) * c->suffixsize);
  33:         c->iov = (struct iovec *)malloc(sizeof(struct iovec) * c->iovsize);
  34:         c->msglist = (struct msghdr *)malloc(sizeof(struct msghdr) * c->msgsize);
  35:  
  36:         if (c->rbuf == 0 || c->wbuf == 0 || c->ilist == 0 || c->iov == 0 ||
  37:                 c->msglist == 0 || c->suffixlist == 0) {
  38:             conn_free(c);
  39:             fprintf(stderr, "malloc()\n");
  40:             return NULL;
  41:         }
  42:  
  43:         STATS_LOCK();
  44:         stats.conn_structs++;    /* 连接池增加1 */
  45:         STATS_UNLOCK();
  46:     }
  47:  
  48:     c->transport = transport;
  49:     c->protocol = settings.binding_protocol;
  50:  
  51:     /* unix socket mode doesn't need this, so zeroed out.  but why
  52:      * is this done for every command?  presumably for UDP
  53:      * mode.  */
  54:     if (!settings.socketpath) {
  55:         c->request_addr_size = sizeof(c->request_addr);
  56:     } else {
  57:         c->request_addr_size = 0;
  58:     }
  59:  
  60:     if (settings.verbose > 1) {
  61:         if (init_state == conn_listening) {
  62:             fprintf(stderr, "<%d server listening (%s)\n", sfd,
  63:                 prot_text(c->protocol));
  64:         } else if (IS_UDP(transport)) {
  65:             fprintf(stderr, "<%d server listening (udp)\n", sfd);
  66:         } else if (c->protocol == negotiating_prot) {
  67:             fprintf(stderr, "<%d new auto-negotiating client connection\n",
  68:                     sfd);
  69:         } else if (c->protocol == ascii_prot) {
  70:             fprintf(stderr, "<%d new ascii client connection.\n", sfd);
  71:         } else if (c->protocol == binary_prot) {
  72:             fprintf(stderr, "<%d new binary client connection.\n", sfd);
  73:         } else {
  74:             fprintf(stderr, "<%d new unknown (%d) client connection\n",
  75:                 sfd, c->protocol);
  76:             assert(false);
  77:         }
  78:     }
  79:  
  80:     c->sfd = sfd;
  81:     c->state = init_state;
  82:     c->rlbytes = 0;
  83:     c->cmd = -1;
  84:     c->rbytes = c->wbytes = 0;
  85:     c->wcurr = c->wbuf;
  86:     c->rcurr = c->rbuf;
  87:     c->ritem = 0;
  88:     c->icurr = c->ilist;
  89:     c->suffixcurr = c->suffixlist;
  90:     c->ileft = 0;
  91:     c->suffixleft = 0;
  92:     c->iovused = 0;
  93:     c->msgcurr = 0;
  94:     c->msgused = 0;
  95:  
  96:     c->write_and_go = init_state;
  97:     c->write_and_free = 0;
  98:     c->item = 0;
  99:  
 100:     c->noreply = false;
 101:  
 102:     /* 设置event事件 */
 103:     event_set(&c->event, sfd, event_flags, event_handler, (void *)c);
 104:     event_base_set(base, &c->event);
 105:     c->ev_flags = event_flags;
 106:  
 107:     if (event_add(&c->event, 0) == -1) {
 108:         /* 失败,则将connection添加到freelist */
 109:         if (conn_add_to_freelist(c)) {
 110:             conn_free(c);
 111:         }
 112:         perror("event_add");
 113:         return NULL;
 114:     }
 115:  
 116:     STATS_LOCK();
 117:     stats.curr_conns++;
 118:     stats.total_conns++;
 119:     STATS_UNLOCK();
 120:  
 121:     MEMCACHED_CONN_ALLOCATE(c->sfd);
 122:  
 123:     return c;
 124: }

进入函数首先通过conn_from_freelist()取得空闲的conn,没有则新建,设置一些字段。

设置event事件,监听socket,并将该连接事件添加到main_base主事件。触发回调是event_handler。里面就是核心的状态机drive_machine()。

阅读(1079) | 评论(0) | 转发(0) |
0

上一篇:内核链表说明

下一篇:没有了

给主人留下些什么吧!~~