ψ(° Д °;)ψ
分类: NOSQL
2013-05-24 17:39:06
thread_init(settings.num_threads, main_base);
1: /*2: * Initializes the thread subsystem, creating various worker threads.3: *4: * nthreads Number of worker event handler threads to spawn5: * main_base Event base for main thread6: */7: void thread_init(int nthreads, struct event_base *main_base) {8: int i;9: int power;10:
11: pthread_mutex_init(&cache_lock, NULL);
12: pthread_mutex_init(&stats_lock, NULL);
13:
14: pthread_mutex_init(&init_lock, NULL);
15: pthread_cond_init(&init_cond, NULL);
16:
17: pthread_mutex_init(&cqi_freelist_lock, NULL);
18: cqi_freelist = NULL;
19:
20: /* Want a wide lock table, but don't waste memory */21: if (nthreads < 3) {22: power = 10;
23: } else if (nthreads < 4) {24: power = 11;
25: } else if (nthreads < 5) {26: power = 12;
27: } else {28: /* 8192 buckets, and central locks don't scale much past 5 threads */29: power = 13;
30: }
31:
32: item_lock_count = hashsize(power);
33:
34: item_locks = calloc(item_lock_count, sizeof(pthread_mutex_t));35: if (! item_locks) {36: perror("Can't allocate item locks");37: exit(1);
38: }
39: for (i = 0; i < item_lock_count; i++) {40: pthread_mutex_init(&item_locks[i], NULL);
41: }
42: pthread_key_create(&item_lock_type_key, NULL);
43: pthread_mutex_init(&item_global_lock, NULL);
44:
45: /*46: * 以下为设置nthreads个工作线程47: */48: threads = calloc(nthreads, sizeof(LIBEVENT_THREAD));49: if (! threads) {50: perror("Can't allocate thread descriptors");51: exit(1);
52: }
53:
54: dispatcher_thread.base = main_base;
55: dispatcher_thread.thread_id = pthread_self();
56:
57: for (i = 0; i < nthreads; i++) {58: int fds[2];59: if (pipe(fds)) {60: perror("Can't create notify pipe");61: exit(1);
62: }
63:
64: threads[i].notify_receive_fd = fds[0];
65: threads[i].notify_send_fd = fds[1];
66:
67: /* 设置每个工作线程参数 */68: setup_thread(&threads[i]);
69: /* Reserve three fds for the libevent base, and two for the pipe */70: stats.reserved_fds += 5;
71: }
72:
73: /* Create threads after we've done all the libevent setup.74: * 创建nthreads个工作线程75: */76: for (i = 0; i < nthreads; i++) {77: create_worker(worker_libevent, &threads[i]);
78: }
79:
80: /* Wait for all the threads to set themselves up before returning.81: * 阻塞等待所有worker_libevent线程创建完成82: */83: pthread_mutex_lock(&init_lock);
84: wait_for_thread_registration(nthreads);
85: pthread_mutex_unlock(&init_lock);
86: }
该函数主要用于创建设置工作线程。
其中LIBEVENT_THREAD为线程句柄,注释一目了然。结构如下:1: typedef struct {2: pthread_t thread_id; /* unique ID of this thread */3: struct event_base *base; /* libevent handle this thread uses libenent句柄 */4: struct event notify_event; /* listen event for notify pipe 管道事件 */5: int notify_receive_fd; /* receiving end of notify pipe \ _ 通知管道 */6: int notify_send_fd; /* sending end of notify pipe / */7: struct thread_stats stats; /* Stats generated by this thread */8: struct conn_queue *new_conn_queue; /* queue of new connections to handle 连接队列 */9: cache_t *suffix_cache; /* suffix cache */10: uint8_t item_lock_type; /* use fine-grained or global item lock */11: } LIBEVENT_THREAD;
开始为每个工作线程初始化句柄,包括创建pipe,以及在setup_thread中设置事件等。1: for (i = 0; i < nthreads; i++) {2: int fds[2];3: if (pipe(fds)) {4: perror("Can't create notify pipe");5: exit(1);
6: }
7:
8: threads[i].notify_receive_fd = fds[0];
9: threads[i].notify_send_fd = fds[1];
10:
11: /* 设置每个工作线程参数 */12: setup_thread(&threads[i]);
13: /* Reserve three fds for the libevent base, and two for the pipe */14: stats.reserved_fds += 5;
15: }
16:
17: ......
18:
19: /*20: * Set up a thread's information.21: */22: static void setup_thread(LIBEVENT_THREAD *me) {23: me->base = event_init();
24: if (! me->base) {25: fprintf(stderr, "Can't allocate event base\n");26: exit(1);
27: }
28:
29: /* Listen for notifications from other threads30: * 监听管道事件31: */32: event_set(&me->notify_event, me->notify_receive_fd,
33: EV_READ | EV_PERSIST, thread_libevent_process, me);
34: event_base_set(me->base, &me->notify_event);
35:
36: if (event_add(&me->notify_event, 0) == -1) {37: fprintf(stderr, "Can't monitor libevent notify pipe\n");38: exit(1);
39: }
40:
41: /* 初始化一个空连接队列CQ */42: me->new_conn_queue = malloc(sizeof(struct conn_queue));43: if (me->new_conn_queue == NULL) {44: perror("Failed to allocate memory for connection queue");45: exit(EXIT_FAILURE);
46: }
47: cq_init(me->new_conn_queue);
48:
49: if (pthread_mutex_init(&me->stats.mutex, NULL) != 0) {50: perror("Failed to initialize mutex");51: exit(EXIT_FAILURE);
52: }
53:
54: me->suffix_cache = cache_create("suffix", SUFFIX_SIZE, sizeof(char*),55: NULL, NULL);
56: if (me->suffix_cache == NULL) {57: fprintf(stderr, "Failed to create suffix cache\n");58: exit(EXIT_FAILURE);
59: }
60: }
setup_thread()主要创建一个event事件,监听之前pipe出来的读管道,并设置触发回调thread_libevent_process。
接着初始化连接队列CQ,CQ是个双链表,结构如下:
1: /* An item in the connection queue. */2: typedef struct conn_queue_item CQ_ITEM;3: struct conn_queue_item {4: int sfd;5: enum conn_states init_state;6: int event_flags; /* libevent标志 */7: int read_buffer_size; /* DATA_BUFFER_SIZE=2048 */8: enum network_transport transport; /* sock协议 */9: CQ_ITEM *next;
10: };
11:
12: /* A connection queue. */13: /* sock连接队列 */14: typedef struct conn_queue CQ;15: struct conn_queue {16: CQ_ITEM *head;
17: CQ_ITEM *tail;
18: pthread_mutex_t lock;
19: pthread_cond_t cond;
20: };
设置完工作线程句柄后,就是创建它们了
1: /* Create threads after we've done all the libevent setup.2: * 创建nthreads个工作线程3: */4: for (i = 0; i < nthreads; i++) {5: create_worker(worker_libevent, &threads[i]);
6: }
7:
8: ......
9:
10: /*11: * Worker thread: main event loop12: */13: static void *worker_libevent(void *arg) {14: LIBEVENT_THREAD *me = arg;
15:
16: /* Any per-thread setup can happen here; thread_init() will block until17: * all threads have finished initializing.18: */19:
20: /* set an indexable thread-specific memory item for the lock type.21: * this could be unnecessary if we pass the conn *c struct through22: * all item_lock calls...23: */24: me->item_lock_type = ITEM_LOCK_GRANULAR;
25: pthread_setspecific(item_lock_type_key, &me->item_lock_type);
26:
27: /* 线程注册,通知已成功创建 */28: register_thread_initialized();
29:
30: /* libevent开始等待事件 */31: event_base_loop(me->base, 0);
32: return NULL;33: }
传递的是之前的LIBEVENT_THREAD。register_thread_initialized是用于通知子线程自己已就绪,主线程用wait_for_thread_registration来等待所有子线程创建完成。工作线程在最后event_base_loop阻塞监听事件了。
完成前面的工作之后,就是网络监听部分了。
1: /* create the listening socket, bind it, and init */2: if (settings.socketpath == NULL) {3: const char *portnumber_filename = getenv("MEMCACHED_PORT_FILENAME");4: char temp_portnumber_filename[PATH_MAX];5: FILE *portnumber_file = NULL;
6:
7: if (portnumber_filename != NULL) {8: snprintf(temp_portnumber_filename,
9: sizeof(temp_portnumber_filename),10: "%s.lck", portnumber_filename);11:
12: portnumber_file = fopen(temp_portnumber_filename, "a");13: if (portnumber_file == NULL) {14: fprintf(stderr, "Failed to open \"%s\": %s\n",15: temp_portnumber_filename, strerror(errno));
16: }
17: }
18:
19: errno = 0;
20: /* 1) 建立TCP连接 */21: if (settings.port && server_sockets(settings.port, tcp_transport,22: portnumber_file)) {
23: vperror("failed to listen on TCP port %d", settings.port);24: exit(EX_OSERR);
25: }
26:
27: /*28: * initialization order: first create the listening sockets29: * (may need root on low ports), then drop root if needed,30: * then daemonise if needed, then init libevent (in some cases31: * descriptors created by libevent wouldn't survive forking).32: */33:
34: /* create the UDP listening socket and bind it */35: /* 2) 创建UDP连接 */36: errno = 0;
37: if (settings.udpport && server_sockets(settings.udpport, udp_transport,38: portnumber_file)) {
39: vperror("failed to listen on UDP port %d", settings.udpport);40: exit(EX_OSERR);
41: }
42:
43: if (portnumber_file) {44: fclose(portnumber_file);
45: rename(temp_portnumber_filename, portnumber_filename);
46: }
47: }
下面我们只分析TCP,UDP类同。
1: /**2: * Create a socket and bind it to a specific port number3: * @param interface the interface to bind to4: * @param port the port number to bind to5: * @param transport the transport protocol (TCP / UDP)6: * @param portnumber_file A filepointer to write the port numbers to7: * when they are successfully added to the list of ports we8: * listen on.9: */10: static int server_socket(const char *interface,11: int port,12: enum network_transport transport,13: FILE *portnumber_file) {
14: int sfd;15: struct linger ling = {0, 0};16: struct addrinfo *ai;17: struct addrinfo *next;18: struct addrinfo hints = { .ai_flags = AI_PASSIVE,19: .ai_family = AF_UNSPEC };
20: char port_buf[NI_MAXSERV];21: int error;22: int success = 0;23: int flags =1;24:
25: hints.ai_socktype = IS_UDP(transport) ? SOCK_DGRAM : SOCK_STREAM;
26:
27: if (port == -1) {28: port = 0;
29: }
30: snprintf(port_buf, sizeof(port_buf), "%d", port);31: error= getaddrinfo(interface, port_buf, &hints, &ai);
32: if (error != 0) {33: if (error != EAI_SYSTEM)34: fprintf(stderr, "getaddrinfo(): %s\n", gai_strerror(error));35: else36: perror("getaddrinfo()");37: return 1;38: }
39:
40: for (next= ai; next; next= next->ai_next) {41: conn *listen_conn_add;
42: /* 创建socket并设置非阻塞 */43: if ((sfd = new_socket(next)) == -1) {44: /* getaddrinfo can return "junk" addresses,45: * we make sure at least one works before erroring.46: * 保证至少一个可用47: */48: if (errno == EMFILE) {49: /* ...unless we're out of fds */50: perror("server_socket");51: exit(EX_OSERR);
52: }
53: continue;54: }
55:
56: #ifdef IPV6_V6ONLY57: if (next->ai_family == AF_INET6) {58: error = setsockopt(sfd, IPPROTO_IPV6, IPV6_V6ONLY, (char *) &flags, sizeof(flags));59: if (error != 0) {60: perror("setsockopt");61: close(sfd);
62: continue;63: }
64: }
65: #endif66:
67: setsockopt(sfd, SOL_SOCKET, SO_REUSEADDR, (void *)&flags, sizeof(flags));68: if (IS_UDP(transport)) {69: maximize_sndbuf(sfd);
70: } else {71: error = setsockopt(sfd, SOL_SOCKET, SO_KEEPALIVE, (void *)&flags, sizeof(flags));72: if (error != 0)73: perror("setsockopt");74:
75: error = setsockopt(sfd, SOL_SOCKET, SO_LINGER, (void *)&ling, sizeof(ling));76: if (error != 0)77: perror("setsockopt");78:
79: error = setsockopt(sfd, IPPROTO_TCP, TCP_NODELAY, (void *)&flags, sizeof(flags));80: if (error != 0)81: perror("setsockopt");82: }
83:
84: if (bind(sfd, next->ai_addr, next->ai_addrlen) == -1) {85: if (errno != EADDRINUSE) {86: perror("bind()");87: close(sfd);
88: freeaddrinfo(ai);
89: return 1;90: }
91: close(sfd);
92: continue;93: } else {94: success++;
95: if (!IS_UDP(transport) && listen(sfd, settings.backlog) == -1) {96: perror("listen()");97: close(sfd);
98: freeaddrinfo(ai);
99: return 1;100: }
101: if (portnumber_file != NULL &&102: (next->ai_addr->sa_family == AF_INET ||
103: next->ai_addr->sa_family == AF_INET6)) {
104: union {105: struct sockaddr_in in;106: struct sockaddr_in6 in6;107: } my_sockaddr;
108: socklen_t len = sizeof(my_sockaddr);109: if (getsockname(sfd, (struct sockaddr*)&my_sockaddr, &len)==0) {110: if (next->ai_addr->sa_family == AF_INET) {111: fprintf(portnumber_file, "%s INET: %u\n",112: IS_UDP(transport) ? "UDP" : "TCP",113: ntohs(my_sockaddr.in.sin_port));
114: } else {115: fprintf(portnumber_file, "%s INET6: %u\n",116: IS_UDP(transport) ? "UDP" : "TCP",117: ntohs(my_sockaddr.in6.sin6_port));
118: }
119: }
120: }
121: }
122:
123: if (IS_UDP(transport)) { /* UPD连接 */124: int c;125:
126: for (c = 0; c < settings.num_threads_per_udp; c++) {127: /* this is guaranteed to hit all threads because we round-robin */128: dispatch_conn_new(sfd, conn_read, EV_READ | EV_PERSIST,
129: UDP_READ_BUFFER_SIZE, transport);
130: }
131: } else { /* TCP连接 */132: /* 将sfd添加到连接队列,并设置事件 */133: if (!(listen_conn_add = conn_new(sfd, conn_listening,134: EV_READ | EV_PERSIST, 1,
135: transport, main_base))) {
136: fprintf(stderr, "failed to create listening connection\n");137: exit(EXIT_FAILURE);
138: }
139: /* 添加到TCP监听队列 */140: listen_conn_add->next = listen_conn;
141: listen_conn = listen_conn_add;
142: }
143: }
144:
145: freeaddrinfo(ai);
146:
147: /* Return zero iff we detected no errors in starting up connections */148: return success == 0;149: }
首先解析每一个主机名,用new_socket()创建了阻塞方式的sfd,bind。
接着是conn_new()。1: conn *conn_new(const int sfd, enum conn_states init_state,2: const int event_flags,3: const int read_buffer_size, enum network_transport transport,4: struct event_base *base) {5: conn *c = conn_from_freelist(); /* 取空闲connection */6:
7: if (NULL == c) {8: if (!(c = (conn *)calloc(1, sizeof(conn)))) {9: fprintf(stderr, "calloc()\n");10: return NULL;11: }
12: MEMCACHED_CONN_CREATE(c);
13:
14: c->rbuf = c->wbuf = 0;
15: c->ilist = 0;
16: c->suffixlist = 0;
17: c->iov = 0;
18: c->msglist = 0;
19: c->hdrbuf = 0;
20:
21: c->rsize = read_buffer_size;
22: c->wsize = DATA_BUFFER_SIZE;
23: c->isize = ITEM_LIST_INITIAL;
24: c->suffixsize = SUFFIX_LIST_INITIAL;
25: c->iovsize = IOV_LIST_INITIAL;
26: c->msgsize = MSG_LIST_INITIAL;
27: c->hdrsize = 0;
28:
29: c->rbuf = (char *)malloc((size_t)c->rsize);30: c->wbuf = (char *)malloc((size_t)c->wsize);31: c->ilist = (item **)malloc(sizeof(item *) * c->isize);32: c->suffixlist = (char **)malloc(sizeof(char *) * c->suffixsize);33: c->iov = (struct iovec *)malloc(sizeof(struct iovec) * c->iovsize);34: c->msglist = (struct msghdr *)malloc(sizeof(struct msghdr) * c->msgsize);35:
36: if (c->rbuf == 0 || c->wbuf == 0 || c->ilist == 0 || c->iov == 0 ||37: c->msglist == 0 || c->suffixlist == 0) {
38: conn_free(c);
39: fprintf(stderr, "malloc()\n");40: return NULL;41: }
42:
43: STATS_LOCK();
44: stats.conn_structs++; /* 连接池增加1 */45: STATS_UNLOCK();
46: }
47:
48: c->transport = transport;
49: c->protocol = settings.binding_protocol;
50:
51: /* unix socket mode doesn't need this, so zeroed out. but why52: * is this done for every command? presumably for UDP53: * mode. */54: if (!settings.socketpath) {55: c->request_addr_size = sizeof(c->request_addr);56: } else {57: c->request_addr_size = 0;
58: }
59:
60: if (settings.verbose > 1) {61: if (init_state == conn_listening) {62: fprintf(stderr, "<%d server listening (%s)\n", sfd,63: prot_text(c->protocol));
64: } else if (IS_UDP(transport)) {65: fprintf(stderr, "<%d server listening (udp)\n", sfd);66: } else if (c->protocol == negotiating_prot) {67: fprintf(stderr, "<%d new auto-negotiating client connection\n",68: sfd);
69: } else if (c->protocol == ascii_prot) {70: fprintf(stderr, "<%d new ascii client connection.\n", sfd);71: } else if (c->protocol == binary_prot) {72: fprintf(stderr, "<%d new binary client connection.\n", sfd);73: } else {74: fprintf(stderr, "<%d new unknown (%d) client connection\n",75: sfd, c->protocol);
76: assert(false);
77: }
78: }
79:
80: c->sfd = sfd;
81: c->state = init_state;
82: c->rlbytes = 0;
83: c->cmd = -1;
84: c->rbytes = c->wbytes = 0;
85: c->wcurr = c->wbuf;
86: c->rcurr = c->rbuf;
87: c->ritem = 0;
88: c->icurr = c->ilist;
89: c->suffixcurr = c->suffixlist;
90: c->ileft = 0;
91: c->suffixleft = 0;
92: c->iovused = 0;
93: c->msgcurr = 0;
94: c->msgused = 0;
95:
96: c->write_and_go = init_state;
97: c->write_and_free = 0;
98: c->item = 0;
99:
100: c->noreply = false;
101:
102: /* 设置event事件 */103: event_set(&c->event, sfd, event_flags, event_handler, (void *)c);104: event_base_set(base, &c->event);
105: c->ev_flags = event_flags;
106:
107: if (event_add(&c->event, 0) == -1) {108: /* 失败,则将connection添加到freelist */109: if (conn_add_to_freelist(c)) {110: conn_free(c);
111: }
112: perror("event_add");113: return NULL;114: }
115:
116: STATS_LOCK();
117: stats.curr_conns++;
118: stats.total_conns++;
119: STATS_UNLOCK();
120:
121: MEMCACHED_CONN_ALLOCATE(c->sfd);
122:
123: return c;124: }
进入函数首先通过conn_from_freelist()取得空闲的conn,没有则新建,设置一些字段。
设置event事件,监听socket,并将该连接事件添加到main_base主事件。触发回调是event_handler。里面就是核心的状态机drive_machine()。