分类: LINUX
2008-11-16 23:49:00
typedef struct server_st *server_t; typedef struct task_queue_st *task_queue_t; typedef struct conn_st *conn_t; /** 服务器结构 */ struct server_st { /** 监听地址 */ struct in_addr listen_addr; /** 监听端口 */ int port; /** 监听描述符 */ int listen_fd; /**负责读取请求的任务队列,一个线程负责处理一个任务队列,一共五个*/ queue_t readreq_task_queues[5]; /**负责写数据的任务队列,一个线程负责处理一个任务队列,一共五个*/ queue_t writedata_task_queues[5]; }; /** 任务队列结构 */ struct task_queue_st { /** 待添加的连接 */ queue_t pending_conn_queue; /** 互斥锁,用于多线程环境下对任务队列的访问控制 */ pthread_mutex_t task_queue_mutex; /** 链表结构,保存任务队列中的所有连接 */ conn_t conn_head; /** 链表的最后一个节点,用于方便在结尾插入新的节点 */ conn_t conn_tail; /** 总连接数*/ int total_conns; }; /** 用户的阶段状态 */ typedef enum { status_ESTABLISHED,/** 客户端刚建立连接 */ status_CONNECTED,/** 客户端正在连接 */ status_START_RECV_DATA,/** 客户端开始接收数据,即表示服务器端开始发送数据 */ status_CLOSED/** 客户端关闭连接 */ } client_status_t; /** 服务器的阶段状态 */ typedef enum { status_SERVER_CONNECTED,/** 与客户端建立连接 */ status_SERVER_CLOSING /** 服务器关闭连接 */ } server_status_t; /** 连接结构 */ struct conn_st { /** 该连接的所属服务器 */ server_t server; /** 该连接的引用计数 */ int ref_count; /** 用户最后一次活动的时间*/ time_t last_activity; /** 客户端当前的状态阶段*/ client_status_t client_status; /** 服务器当前的状态阶段*/ server_status_t server_status; /** socket描述符*/ int fd; /** 数据缓冲,当服务器端需要不断的发送数据给客户端时被用到 */ char *remain_data; /** 数据缓冲剩余数据大小 */ int remain_data_size; /** 下一个连接 */ conn_t readreq_task_queue_next; /** 下一个连接 */ conn_t writedata_task_queue_next; }; /** 主函数 */ int main(int argc, char **argv) { int idx, conn_fd; server_t server; server = must_calloc(1, sizeof(struct server_st)); server->port = 端口号; server->listen_addrs.s_addr = INADDR_ANY; server->listen_fd = 服务器监听(&server->listen_addr,server->port); 设置成非堵塞(server->listen_fd); /* 创建5个负责读取请求的任务线程 */ for (idx = 0; idx < 5; idx++) { task_queue_t task_queue = server->readreq_task_queues[idx]; bzero(task_queue,sizeof(struct task_queue_st)); task_queue->pending_conn_queue = queue_new(); pthread_mutex_init(&task_queue->task_queue_mutex,NULL); pthread_create(&task_queue->thread_tid, NULL, readreq_task_thread, task_queue); } /* 创建5个负责写数据的任务线程 */ for (idx = 0; idx < 5; idx++) { task_queue_t task_queue = server->writedata_task_queues[idx]; bzero(task_queue,sizeof(struct task_queue_st)); task_queue->pending_conn_queue = queue_new(); pthread_mutex_init(&task_queue->task_queue_mutex,NULL); pthread_create(&task_queue->thread_tid, NULL, writedata_task_thread, task_queue); } /* 无限循环的监听,等待客户端的连接请求 */ while (1) { conn_fd = accept(server->listen_fd, 0, 0); if (conn_fd < 0) continue; /* 和一个客户端建立了连接,下面开始把该连接分配给负责读取请求的任务线程*/ assign_to_readreq_task_thread(server,conn_fd); } return 0; } /*分配任务,原则为哪个线程的任务队列最少就分配给哪一个线程去处理*/ static void assign_to_readreq_task_thread(server_t server, int conn_fd) { int idx; task_queue_t task_queue, min_task_queue = NULL; conn_t conn; /* 查找连接数最少的任务队列 */ for (idx = 0; idx < 5; idx++) { task_queue = server->readreq_task_queues[idx]; if (min_task_queue == NULL) min_task_queue = task_queue; else if (min_task_queue->total_conns > task_queue->total_conns) min_task_queue = task_queue; } if (min_task_queue) { conn = must_calloc(1, sizeof(struct conn_st)); conn->server = server; conn->client_status = status_ESTABLISHED; conn->server_status = status_SERVER_CONNECTED; conn->ref_count++;/*累加引用计数*/ /*添加到该任务队列中的待添加队列并且累加该任务队列里的连接数*/ pthread_mutex_lock(&min_task_queue->task_queue_mutex); queue_push(min_task_queue->pending_conn_queue, conn, 1); min_task_queue->total_conns++; pthread_mutex_unlock(&min_task_queue->task_queue_mutex); } } /** *负责读取请求的任务线程处理主函数,负责管理,处理自己队列中的所有连接 */ static void *readreq_task_thread(void *arg) { int epfd = epoll_create(20480); struct epoll_event ev,happened_ev[128]; task_queue_t task_queue = (task_queue_t) arg; int ready,i; while (1) { time_t curtime = time(NULL); /*下面三个变量遍历链表时使用,conn是当前连接,prev_conn是先前的连接,discarded_conn是丢弃的连接*/ conn_t conn = NULL, prev_conn = NULL, discarded_conn; /*把待添加队列里的连接添加到任务队列中*/ pthread_mutex_lock(&task_queue->task_queue_mutex); while ((conn = queue_pop(task_queue->pending_conn_queue))) { if (!task_queue->conn_head) { task_queue->conn_head = conn; } else { task_queue->conn_tail->readreq_task_queue_next = conn; } task_queue->conn_tail = conn; } pthread_mutex_unlock(&task_queue->task_queue_mutex); if (task_queue->total_conns == 0) { thread_sleep(1); continue; } conn = task_queue->conn_head; while (conn) { //清理已经关闭的连接,把已经关闭的连接和超时的连接从链表中删除掉 boolean conn_timeout = conn->last_activity > 0 && curtime - conn->last_activity > CLIENT_TIMEOUT; if ((conn->client_status == status_CLOSED || conn->server_status == status_SERVER_CLOSING || conn_timeout) && conn->ref_count == 1) { task_queue->total_conns--; discarded_conn = conn;//先放入临时变量中 //从链表中删除 if (prev_conn) { if (!conn->readreq_task_queue_next) { task_queue->conn_tail = prev_conn; prev_conn->readreq_task_queue_next = NULL; } else { prev_conn->readreq_task_queue_next = conn->readreq_task_queue_next; } } else { task_queue->conn_head = conn->readreq_task_queue_next; } conn = conn->readreq_task_queue_next; close(discarded_conn->fd);/*真正关闭连接的地方*/ free(discarded_conn); } else { ev.data.fd = conn->fd; ev.data.ptr = conn; /*只负责监视读事件和出错事件*/ ev.events = EPOLLIN | EPOLLET; if(conn->client_status == status_ESTABLISHED) { 设置超时时间或者设置非堵塞(conn->fd);/*两者必须选一个*/ epoll_ctl(epfd,EPOLL_CTL_ADD,conn->fd,&ev); conn->client_status = status_CONNECTED; } else { epoll_ctl(epfd,EPOLL_CTL_MOD,conn->fd,&ev); } prev_conn = conn; conn = conn->readreq_task_queue_next; } } /** 堵塞等待,超时时间为POLL_TIMEOUT毫秒*/ if ((ready = epoll_wait(epfd, happened_ev,sizeof(happened_ev) / sizeof(struct epoll_event), POLL_TIMEOUT)) < 0) continue; /** * 循环查找向我们发送请求的客户端的连接 */ for (i = 0; i < ready; i++) { if (happened_ev[i].events & POLLIN) { conn = (conn_t) happened_ev[i].data.ptr; conn->last_activity = time(NULL);/* 记录客户端的最后活动时间*/ proc_protocol(conn);/*处理协议函数*/ } else if(happened_ev[i].events & EPOLLHUP) { conn = (conn_t) happened_ev[i].data.ptr; conn->client_status = status_CLOSED; } } } } /* 协议处理函数 */ void proc_protocol(conn_t conn) { 某一时刻客户端要求服务器端不断发送数据给客户端,然后设置了conn->client_status = status_START_RECV_DATA,并且调用了下面的move_to_writedata_task_thread(conn); } /*分配任务,原则为哪个线程的任务队列最少就分配给哪一个线程去处理*/ static void move_to_writedata_task_thread(conn_t conn) { server_t server = conn->server; task_queue_t task_queue, min_task_queue= NULL; int idx; /* 查找连接数最少的任务队列 */ for (idx = 0; idx < 5; idx++) { task_queue = server->writedata_task_queues[idx]; if (min_task_queue == NULL) min_task_queue = task_queue; else if (min_task_queue->total_conns > task_queue->total_conns) min_task_queue = task_queue; } if (min_task_queue) { conn->ref_count++;/*累加引用计数*/ /*添加到该任务队列中的待添加队列并且累加该任务队列里的连接数*/ pthread_mutex_lock(&min_task_queue->task_queue_mutex); queue_push(min_task_queue->pending_conn_queue, conn, 1); min_task_queue->total_conns++; pthread_mutex_unlock(&min_task_queue->task_queue_mutex); } } /** *负责写数据的线程处理主函数,负责管理,处理自己队列中的所有连接 */ static void *writedata_task_thread(void *arg) { task_queue_t task_queue = (task_queue_t) arg; int epfd = epoll_create(20480); struct epoll_event ev,happened_ev[128]; int ready,i,nwrite,nread,sndbuf_size = 10240; char buf[sndbuf_size]; while (1) { time_t curtime = time(NULL); /*下面三个变量遍历链表时使用,conn是当前连接,prev_conn是先前的连接,discarded_conn是丢弃的连接*/ conn_t conn = NULL, prev_conn = NULL, discarded_conn; /*把待添加队列里的连接添加到任务队列中*/ pthread_mutex_lock(&task_queue->task_queue_mutex); while ((conn = queue_pop(task_queue->pending_conn_queue))) { if (!task_queue->conn_head) { task_queue->conn_head = conn; } else { task_queue->conn_tail->writedata_task_queue_next = conn; } task_queue->conn_tail = conn; } pthread_mutex_unlock(&task_queue->task_queue_mutex); if (task_queue->total_conns == 0) { thread_sleep(1); continue; } conn = task_queue->conn_head; while (conn) { //清理已经关闭的连接,把已经关闭的连接和超时的连接从链表中删除掉 boolean conn_timeout = conn->last_activity > 0 && curtime - conn->last_activity > CLIENT_TIMEOUT; if (conn->client_status == status_CLOSED || conn->server_status == status_SERVER_CLOSING || conn_timeout) { task_queue->total_conns--; discarded_conn = conn;//放入临时变量中 //从链表中删除 if (prev_conn) { if (!conn->writedata_task_queue_next) { task_queue->conn_tail = prev_conn; prev_conn->writedata_task_queue_next = NULL; } else { prev_conn->writedata_task_queue_next = conn->writedata_task_queue_next; } } else { task_queue->conn_head = conn->writedata_task_queue_next; } conn = conn->writedata_task_queue_next; discarded_conn->ref_count--;/*引用计数减一,并且该连接已经关闭了,那么真正关闭连接的任务就交给readreq_task_thread函数了*/ } else { ev.data.fd = conn->fd; ev.data.ptr = conn; /*只负责监视写事件和出错事件*/ ev.events = EPOLLOUT | EPOLLET; if(conn->client_status == status_START_RECV_DATA) { setsockopt(conn->fd, SOL_SOCKET, SO_SNDBUF, (char *)&sndbuf_size, sizeof(int)); epoll_ctl(epfd,EPOLL_CTL_ADD,conn->fd,&ev); conn->client_status = status_CONNECTED; } else { epoll_ctl(epfd,EPOLL_CTL_MOD,conn->fd,&ev); } prev_conn = conn; conn = conn->writedata_task_queue_next; } } /**堵塞等待,超时时间为POLL_TIMEOUT毫秒*/ if ((ready = epoll_wait(epfd, happened_ev,sizeof(happened_ev) / sizeof(struct epoll_event), POLL_TIMEOUT)) < 0) continue; /** * 循环查找向我们发送请求的客户端的连接 */ for (i = 0; i < ready; i++) { if (happened_ev[i].events & EPOLLOUT) { conn = (conn_t) happened_ev[i].data.ptr; conn->last_activity = time(NULL);/* 记录客户端的最后活动时间*/ if(conn->remain_data_size == 0) {/*如果数据缓存已经没有任何数据了的话*/ if((nread = read(文件描述符,buf,sizeof(buf))) > 0) {/*从硬盘读数据出来到数据缓存*/ if((nwrite = write(conn->fd,buf,nread)) < 0 && errno != EINTR) { conn->client_status = status_CLOSED; continue; } if(nwrite < nread) { conn->remain_data_size = nread - nwrite; conn->remain_data = must_malloc(conn->remain_data_size); memcpy(conn->remain_data,buf + nwrite, conn->remain_data_size); } } else if(nread == 0) { conn->server_status = status_SERVER_CLOSING; } } else { /*写数据缓存里的数据到客户端*/ if((nwrite = write(conn->fd,conn->remain_data,conn->remain_data_size)) < 0 && errno != EINTR) { conn->client_status = status_CLOSED; free(conn->remain_data); continue; } if(nwrite < conn->remain_data_size) { conn->remain_data_size = conn->remain_data_size - nwrite; memcpy(buf, conn->remain_data + nwrite, conn->remain_data_size); conn->remain_data = must_realloc(conn->remain_data, conn->remain_data_size); memcpy(conn->remain_data, buf, conn->remain_data_size); } else { free(conn->remain_data); conn->remain_data = NULL; conn->remain_data_size = 0; } } } else if(happened_ev[i].events & EPOLLHUP) { conn = (conn_t) happened_ev[i].data.ptr; conn->client_status = status_CLOSED; } } } }
转自linuxsir.org
作者:lijiuwei
邮箱:lijiuwei0902@gmail.com