1. 连接池的作用
为了提高Nginx的访问速度,Nginx使用了连接池。
连接池是一个数组,里面预先分配了很多个(根据配置文件的配置)ngx_connection_s结构。
当有客户端请求连接时,就从该数组中找到一个没有使用的ngx_connection_s,用来连接用户。
当用户close时,Nginx并没有释放掉这些数组,而是标记为可连接,然后等待下个客户端的连接。
2. ngx_connection_s 结构
-
struct ngx_connection_s {
-
void *data;
-
ngx_event_t *read;
-
ngx_event_t *write;
-
-
ngx_socket_t fd; // 系统为该用户分配的fd
-
// 回调函数
-
ngx_recv_pt recv; // 接收 回调函数
-
ngx_send_pt send;
-
ngx_recv_chain_pt recv_chain; // 接收chain的回调函数
-
ngx_send_chain_pt send_chain;
-
-
ngx_listening_t *listening;
-
-
off_t sent; // 已发送的字节数
-
-
ngx_log_t *log;
-
-
ngx_pool_t *pool;
-
-
struct sockaddr *sockaddr;
-
socklen_t socklen;
-
ngx_str_t addr_text;
-
-
struct sockaddr *local_sockaddr;
-
-
ngx_buf_t *buffer; // buffer
-
-
ngx_queue_t queue;
-
-
ngx_atomic_uint_t number;
-
-
ngx_uint_t requests; // 该链接 请求的次数
-
-
unsigned buffered:8;
-
-
unsigned log_error:3; /* ngx_connection_log_error_e */
-
-
unsigned unexpected_eof:1;
-
unsigned timedout:1;
-
unsigned error:1;
-
unsigned destroyed:1;
-
-
unsigned idle:1;
-
unsigned reusable:1; // 是否可以 重复使用该链接
-
unsigned close:1;
-
-
unsigned sendfile:1;
-
unsigned sndlowat:1;
-
unsigned tcp_nodelay:2; /* ngx_connection_tcp_nodelay_e */
-
unsigned tcp_nopush:2; /* ngx_connection_tcp_nopush_e */
-
};
-
-
// cycle全局数据结构
-
struct ngx_cycle_s {
-
......
-
ngx_connection_t **files; //连接文件
-
ngx_connection_t *free_connections; // 空闲连接
-
ngx_uint_t free_connection_n; // 空闲连接个数
-
-
ngx_queue_t reusable_connections_queue;//可复用 连接队列
-
-
ngx_array_t listening; //监听 socket队列[Nginx服务器可能同时监听几个端口].
-
ngx_array_t paths;
-
ngx_list_t open_files; // 打开的文件链表
-
ngx_list_t shared_memory; // 共享内存 链表
-
-
ngx_uint_t connection_n; // work_connections,配置文件中设置 "worker_connections"
-
ngx_uint_t files_n;
-
-
ngx_connection_t *connections; // 指向 ngx_connection_s 数组,即连接池
-
ngx_event_t *read_events; // 指向 读事件 数组
-
ngx_event_t *write_events;// 指向 写事件 数组
-
......
-
}
3. 连接池初始化
-
// 分配 ngx_connections_t 数组
-
static ngx_int_t ngx_event_process_init(ngx_cycle_t *cycle)
-
{
-
... ...
-
if (ngx_event_flags & NGX_USE_FD_EVENT) {
-
struct rlimit rlmt; //资源限制
-
-
if (getrlimit(RLIMIT_NOFILE, &rlmt) == -1) { //RLIMIT_NOFILE,一个进程能够打开的最大fd数. ulimit -n
-
ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno, "getrlimit(RLIMIT_NOFILE) failed");
-
return NGX_ERROR;
-
}
-
-
cycle->files_n = (ngx_uint_t) rlmt.rlim_cur;
-
-
cycle->files = ngx_calloc(sizeof(ngx_connection_t *) * cycle->files_n, cycle->log);
-
if (cycle->files == NULL) {
-
return NGX_ERROR;
-
}
-
}
-
... ...
-
// 为connections分配内存
-
cycle->connections = ngx_alloc(sizeof(ngx_connection_t) * cycle->connection_n, cycle->log);
-
if (cycle->connections == NULL) {
-
return NGX_ERROR;
-
}
-
-
c = cycle->connections;
-
// 为read_events分配内存
-
cycle->read_events = ngx_alloc(sizeof(ngx_event_t) * cycle->connection_n, cycle->log);
-
if (cycle->read_events == NULL) {
-
return NGX_ERROR;
-
}
-
-
rev = cycle->read_events;
-
for (i = 0; i < cycle->connection_n; i++) {
-
rev[i].closed = 1;
-
rev[i].instance = 1;
-
}
-
// 为write_events分配内存
-
cycle->write_events = ngx_alloc(sizeof(ngx_event_t) * cycle->connection_n, cycle->log);
-
if (cycle->write_events == NULL) {
-
return NGX_ERROR;
-
}
-
-
wev = cycle->write_events;
-
for (i = 0; i < cycle->connection_n; i++) {
-
wev[i].closed = 1;
-
}
-
-
i = cycle->connection_n;
-
next = NULL;
-
-
do {
-
i--;
-
// 每个connection,对应一个read_events, 一个write_events
-
c[i].data = next;
-
c[i].read = &cycle->read_events[i];
-
c[i].write = &cycle->write_events[i];
-
c[i].fd = (ngx_socket_t) -1;
-
-
next = &c[i];
-
-
} while (i);
-
-
cycle->free_connections = next;//指向connection[0]
-
cycle->free_connection_n = cycle->connection_n; //free链接数
-
-
// 给监听端口,分配资源
-
ls = cycle->listening.elts;
-
for (i = 0; i < cycle->listening.nelts; i++) {
-
//传入listen的fd,返回一个connection
-
c = ngx_get_connection(ls[i].fd, cycle->log);
-
-
if (c == NULL) {
-
return NGX_ERROR;
-
}
-
-
c->log = &ls[i].log;
-
-
c->listening = &ls[i];
-
ls[i].connection = c;
-
-
rev = c->read;
-
-
rev->log = c->log;
-
rev->accept = 1;
-
-
-
#if (NGX_HAVE_DEFERRED_ACCEPT)
-
rev->deferred_accept = ls[i].deferred_accept;
-
#endif
-
// 注册事件处理函数
-
rev->handler = ngx_event_accept;
-
// 若使用accept_mutex来解决惊群问题,直接返回
-
if (ngx_use_accept_mutex) {
-
continue;
-
}
-
-
if (ngx_event_flags & NGX_USE_RTSIG_EVENT) {
-
if (ngx_add_conn(c) == NGX_ERROR) {
-
return NGX_ERROR;
-
}
-
-
} else { // 将READ事件加入到epoll中
-
if (ngx_add_event(rev, NGX_READ_EVENT, 0) == NGX_ERROR) {
-
return NGX_ERROR;
-
}
-
}
-
}
-
-
return NGX_OK;
-
}
4. 从连接池中获取可用的连接资源
-
// 找到一个空闲的、可用的connection [不用去重新分配内存]
-
ngx_connection_t* ngx_get_connection(ngx_socket_t s, ngx_log_t *log)
-
{
-
ngx_uint_t instance;
-
ngx_event_t *rev, *wev;
-
ngx_connection_t *c;
-
-
// socket连接数太大
-
if (ngx_cycle->files && (ngx_uint_t) s >= ngx_cycle->files_n) {
-
ngx_log_error(NGX_LOG_ALERT, log, 0,
-
"the new socket has number %d, "
-
"but only %ui files are available",
-
s, ngx_cycle->files_n);
-
return NULL;
-
}
-
-
c = ngx_cycle->free_connections;
-
// 当free_connection中没有可以用的connection时,扫描连接池,找到一个可以用的
-
if (c == NULL) {
-
ngx_drain_connections();
-
c = ngx_cycle->free_connections;
-
}
-
-
if (c == NULL) {
-
ngx_log_error(NGX_LOG_ALERT, log, 0,
-
"%ui worker_connections are not enough",
-
ngx_cycle->connection_n);
-
-
return NULL;
-
}
-
-
ngx_cycle->free_connections = c->data; // 指向下一个connections
-
ngx_cycle->free_connection_n--; // 空闲connection数-1
-
-
if (ngx_cycle->files) {
-
ngx_cycle->files[s] = c;
-
}
-
-
rev = c->read;
-
wev = c->write;
-
-
ngx_memzero(c, sizeof(ngx_connection_t));
-
-
c->read = rev;
-
c->write = wev;
-
c->fd = s;
-
c->log = log;
-
-
instance = rev->instance;
-
-
ngx_memzero(rev, sizeof(ngx_event_t));
-
ngx_memzero(wev, sizeof(ngx_event_t));
-
-
rev->instance = !instance;
-
wev->instance = !instance;
-
-
rev->index = NGX_INVALID_INDEX;
-
wev->index = NGX_INVALID_INDEX;
-
-
rev->data = c;
-
wev->data = c;
-
-
wev->write = 1;
-
-
return c;
-
}
5. 当free_connections中没有可用的连接时,从reusable_connections_queue中找
-
static void ngx_drain_connections(void)
-
{
-
ngx_int_t i;
-
ngx_queue_t *q;
-
ngx_connection_t *c;
-
// 从可复用的链接队列里面,获取connection.
-
for (i = 0; i < 32; i++) {
-
if (ngx_queue_empty(&ngx_cycle->reusable_connections_queue)) {
-
break;
-
}
-
-
q = ngx_queue_last(&ngx_cycle->reusable_connections_queue);
-
c = ngx_queue_data(q, ngx_connection_t, queue);
-
-
ngx_log_debug0(NGX_LOG_DEBUG_CORE, c->log, 0, "reusing connection");
-
-
c->close = 1; // 关闭该连接
-
c->read->handler(c->read);
-
}
-
}
6. 接受客户端连接请求
-
// 当客户端请求连接时
-
void ngx_event_accept(ngx_event_t *ev)
-
{
-
......
-
lc = ev->data;
-
ls = lc->listening;
-
ev->ready = 0;
-
// 接受客户端连接请求, accept
-
s = accept4(lc->fd, (struct sockaddr *) sa, &socklen, SOCK_NONBLOCK);
-
-
// 获取一个空闲连接。通过socket ,获取到connect结构,这样该socket就和connection相关连
-
c = ngx_get_connection(s, ev->log);
-
-
if (c == NULL) {
-
if (ngx_close_socket(s) == -1) {
-
ngx_log_error(NGX_LOG_ALERT, ev->log, ngx_socket_errno, ngx_close_socket_n " failed");
-
}
-
-
return;
-
}
-
-
c->pool = ngx_create_pool(ls->pool_size, ev->log);
-
if (c->pool == NULL) {
-
ngx_close_accepted_connection(c);
-
return;
-
}
-
-
c->sockaddr = ngx_palloc(c->pool, socklen);
-
if (c->sockaddr == NULL) {
-
ngx_close_accepted_connection(c);
-
return;
-
}
-
// 拷贝客户端socket信息
-
ngx_memcpy(c->sockaddr, sa, socklen);
-
-
log = ngx_palloc(c->pool, sizeof(ngx_log_t));
-
if (log == NULL) {
-
ngx_close_accepted_connection(c);
-
return;
-
}
-
-
if (ngx_inherited_nonblocking) {
-
if (ngx_event_flags & NGX_USE_AIO_EVENT) {
-
if (ngx_blocking(s) == -1) { // aio,设置为阻塞模式
-
ngx_log_error(NGX_LOG_ALERT, ev->log, ngx_socket_errno, ngx_blocking_n " failed");
-
ngx_close_accepted_connection(c);
-
return;
-
}
-
}
-
-
} else {
-
if (!(ngx_event_flags & (NGX_USE_AIO_EVENT|NGX_USE_RTSIG_EVENT))) {
-
if (ngx_nonblocking(s) == -1) { // 非aio,设置为非阻塞模式
-
ngx_log_error(NGX_LOG_ALERT, ev->log, ngx_socket_errno, ngx_nonblocking_n " failed");
-
ngx_close_accepted_connection(c);
-
return;
-
}
-
}
-
}
-
-
*log = ls->log;
-
// 初始化回调函数
-
c->recv = ngx_recv; // 初始化 recv函数
-
c->send = ngx_send; // 初始化 send函数
-
c->recv_chain = ngx_recv_chain;
-
c->send_chain = ngx_send_chain;
-
-
c->log = log;
-
c->pool->log = log;
-
-
c->socklen = socklen;
-
c->listening = ls;
-
c->local_sockaddr = ls->sockaddr;
-
-
c->unexpected_eof = 1;
-
......
-
ls->handler(c); // ngx_http_init_connections
-
}
7. 关闭connection
-
// 关闭connection
-
void ngx_close_connection(ngx_connection_t *c)
-
{
-
ngx_err_t err;
-
ngx_uint_t log_error, level;
-
ngx_socket_t fd;
-
-
if (c->fd == -1) {
-
ngx_log_error(NGX_LOG_ALERT, c->log, 0, "connection already closed");
-
return;
-
}
-
// del定时读
-
if (c->read->timer_set) {
-
ngx_del_timer(c->read);
-
}
-
// del定时写
-
if (c->write->timer_set) {
-
ngx_del_timer(c->write);
-
}
-
-
if (ngx_del_conn) {
-
ngx_del_conn(c, NGX_CLOSE_EVENT);
-
-
} else {
-
if (c->read->active || c->read->disabled) {
-
ngx_del_event(c->read, NGX_READ_EVENT, NGX_CLOSE_EVENT); // 从epoll中del读
-
}
-
-
if (c->write->active || c->write->disabled) {
-
ngx_del_event(c->write, NGX_WRITE_EVENT, NGX_CLOSE_EVENT);// 从epoll中del写
-
}
-
}
-
-
if (c->read->prev) {
-
ngx_delete_posted_event(c->read); // del 延时读
-
}
-
-
if (c->write->prev) {
-
ngx_delete_posted_event(c->write); // del 延时写
-
}
-
-
c->read->closed = 1;
-
c->write->closed = 1;
-
-
// 不放入到 reusable 队列
-
ngx_reusable_connection(c, 0);
-
-
log_error = c->log_error;
-
// 放入到 free_connections
-
ngx_free_connection(c);
-
-
fd = c->fd;
-
c->fd = (ngx_socket_t) -1;
-
// close socket
-
if (ngx_close_socket(fd) == -1) {
-
... ...
-
}
-
}
阅读(5502) | 评论(0) | 转发(0) |