while (1)
{
socklen_t sock, size;
struct sockaddr_in client;
size = sizeof (client);
sock = accept (main_socket, (struct sockaddr *) &client, &size); if (sock < 0)
{
perror ("Error accepting on main socket");
main_exit (EXIT_FAILURE);
}
handle_request_thread (client, sock); /*线程处理socket*/
}
---------------------------------------------------------------------------------------thread_mgr.c
/*线程入口函数*/
static void *do_thread (void *arg) {
thread_item_t *t = (thread_item_t *)arg;
struct sockaddr_in client; int sock;
log_printf (ERROR_LOG, LOG_LEVEL_INFO, "Thread number %d has started\n", t - threads);
do {
sem_wait (&waiting_for_request); /* 精妙之处定义信号量等待 */
client = req_client; sock = req_sock;
log_printf (ERROR_LOG, LOG_LEVEL_INFO, "Thread number %d is handling request\n", t - threads);
sem_post (&waiting_for_handle); /*同步主线程,让主线程继续*/
handle_request (client, sock, t->buffer, t->buffer_len);/*具体处理*/
log_printf (ERROR_LOG, LOG_LEVEL_INFO, "Thread number %d closed connection\n", t - threads);
} while (!t->temp); /*此处目的为了让临时线程执行一次退出,临时线程是从最小到最大线程数之间线程*/
if (t->buffer) free (t->buffer);
log_printf (ERROR_LOG, LOG_LEVEL_INFO, "Thread number %d is terminating\n", t - threads);
return NULL;
}
/* 创建线程对象*/
int init_threads (void) {
int i;
if (pthread_mutex_init (&log_mutex, NULL)) return -1;
if (sem_init (&waiting_for_request, 0, 0)) return -1;
if (sem_init (&waiting_for_handle, 0, 0)) return -1;
threads = malloc (sizeof (thread_item_t) * server_config.max_threads);
for (i = 0; i < server_config.max_threads; i++) {
threads[i].free = 1;
}
/* 生成最少线程数目 */
for (i = 0; i < server_config.min_threads; i++) {
if (pthread_create (&threads[i].th, NULL, do_thread, &threads[i])) return -1;
threads[i].free = threads[i].temp = 0;/*线程为非临时线程*/
threads[i].buffer_len = BUFFER_LEN;
if (!(threads[i].buffer = malloc (threads[i].buffer_len))) return -1;
}
return 0;
}
/*主线程进入函数*/
void handle_request_thread (struct sockaddr_in client, int sock) {
int val, i;
req_client = client; req_sock = sock;
sem_post (&waiting_for_request);/*通过信号量让工作线程工作*/
sem_getvalue (&waiting_for_request, &val);
if (val > 0) { /*如果还有未处理的增加工作线程,此处精妙,动态控制线程数目*/
for (i = server_config.min_threads; i < server_config.max_threads; i++) {
if (threads[i].free) {
threads[i].buffer_len = BUFFER_LEN;
if (!(threads[i].buffer = malloc (threads[i].buffer_len))) break;
threads[i].temp = 1; /*此时生成的线程为临时线程*/
if (pthread_create (&threads[i].th, NULL, do_thread, &threads[i])) break;
threads[i].free = 0;
break;
}
}
}
sem_wait (&waiting_for_handle);/*有工作线程接收工作才可以退出*/
}
|