Chinaunix首页 | 论坛 | 博客
  • 博客访问: 158376
  • 博文数量: 22
  • 博客积分: 425
  • 博客等级: 下士
  • 技术积分: 350
  • 用 户 组: 普通用户
  • 注册时间: 2012-05-15 09:43
个人简介

专注服务器设计、开发; https://github.com/justscu;

文章分类

全部博文(22)

文章存档

2014年(10)

2013年(2)

2012年(10)

分类: 服务器与存储

2014-04-22 17:06:41

1. 连接池的作用
    为了提高Nginx的访问速度,Nginx使用了连接池。连接池是一个数组,里面预先分配了很多个(根据配置文件的配置)ngx_connection_s结构。
    当有客户端请求连接时,就从该数组中找到一个没有使用的ngx_connection_s,用来连接用户。
    当用户close时,Nginx并没有释放掉这些数组,而是标记为可连接,然后等待下个客户端的连接。

2. ngx_connection_s 结构
  1. struct ngx_connection_s {
  2.     void *data;
  3.     ngx_event_t *read;
  4.     ngx_event_t *write;

  5.     ngx_socket_t fd; // 系统为该用户分配的fd
  6.     // 回调函数
  7.     ngx_recv_pt recv; // 接收 回调函数
  8.     ngx_send_pt send;
  9.     ngx_recv_chain_pt recv_chain; // 接收chain的回调函数
  10.     ngx_send_chain_pt send_chain;

  11.     ngx_listening_t *listening;

  12.     off_t sent; // 已发送的字节数

  13.     ngx_log_t *log;

  14.     ngx_pool_t *pool;

  15.     struct sockaddr *sockaddr;
  16.     socklen_t socklen;
  17.     ngx_str_t addr_text;

  18.     struct sockaddr *local_sockaddr;

  19.     ngx_buf_t *buffer; // buffer

  20.     ngx_queue_t queue;

  21.     ngx_atomic_uint_t number;

  22.     ngx_uint_t requests; // 该链接 请求的次数

  23.     unsigned buffered:8;

  24.     unsigned log_error:3; /* ngx_connection_log_error_e */

  25.     unsigned unexpected_eof:1;
  26.     unsigned timedout:1;
  27.     unsigned error:1;
  28.     unsigned destroyed:1;

  29.     unsigned idle:1;
  30.     unsigned reusable:1; // 是否可以 重复使用该链接
  31.     unsigned close:1;

  32.     unsigned sendfile:1;
  33.     unsigned sndlowat:1;
  34.     unsigned tcp_nodelay:2; /* ngx_connection_tcp_nodelay_e */
  35.     unsigned tcp_nopush:2; /* ngx_connection_tcp_nopush_e */
  36. };

  37. // cycle全局数据结构
  38. struct ngx_cycle_s {
  39.     ......
  40.     ngx_connection_t **files; //连接文件
  41.     ngx_connection_t *free_connections; // 空闲连接
  42.     ngx_uint_t free_connection_n; // 空闲连接个数

  43.     ngx_queue_t reusable_connections_queue;//可复用 连接队列

  44.     ngx_array_t listening; //监听 socket队列[Nginx服务器可能同时监听几个端口].
  45.     ngx_array_t paths;
  46.     ngx_list_t open_files; // 打开的文件链表
  47.     ngx_list_t shared_memory; // 共享内存 链表

  48.     ngx_uint_t connection_n; // work_connections,配置文件中设置 "worker_connections"
  49.     ngx_uint_t files_n;

  50.     ngx_connection_t *connections; // 指向 ngx_connection_s 数组即连接池
  51.     ngx_event_t *read_events; // 指向 读事件 数组
  52.     ngx_event_t *write_events;// 指向 写事件 数组
  53.     ......
  54. }

3. 连接池初始化

点击(此处)折叠或打开

  1. // 分配 ngx_connections_t 数组
  2. static ngx_int_t ngx_event_process_init(ngx_cycle_t *cycle)
  3. {
  4.     ... ...
  5.     if (ngx_event_flags & NGX_USE_FD_EVENT) {
  6.         struct rlimit rlmt; //资源限制

  7.         if (getrlimit(RLIMIT_NOFILE, &rlmt) == -1) { //RLIMIT_NOFILE,一个进程能够打开的最大fd数. ulimit -n
  8.             ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno, "getrlimit(RLIMIT_NOFILE) failed");
  9.             return NGX_ERROR;
  10.         }

  11.         cycle->files_n = (ngx_uint_t) rlmt.rlim_cur;

  12.         cycle->files = ngx_calloc(sizeof(ngx_connection_t *) * cycle->files_n, cycle->log);
  13.         if (cycle->files == NULL) {
  14.             return NGX_ERROR;
  15.         }
  16.     }
  17.     ... ...
  18.     // 为connections分配内存
  19.     cycle->connections = ngx_alloc(sizeof(ngx_connection_t) * cycle->connection_n, cycle->log);
  20.     if (cycle->connections == NULL) {
  21.         return NGX_ERROR;
  22.     }

  23.     c = cycle->connections;
  24.     // 为read_events分配内存
  25.     cycle->read_events = ngx_alloc(sizeof(ngx_event_t) * cycle->connection_n, cycle->log);
  26.     if (cycle->read_events == NULL) {
  27.         return NGX_ERROR;
  28.     }

  29.     rev = cycle->read_events;
  30.     for (i = 0; i < cycle->connection_n; i++) {
  31.         rev[i].closed = 1;
  32.         rev[i].instance = 1;
  33.     }
  34.     // 为write_events分配内存
  35.     cycle->write_events = ngx_alloc(sizeof(ngx_event_t) * cycle->connection_n, cycle->log);
  36.     if (cycle->write_events == NULL) {
  37.         return NGX_ERROR;
  38.     }

  39.     wev = cycle->write_events;
  40.     for (i = 0; i < cycle->connection_n; i++) {
  41.         wev[i].closed = 1;
  42.     }
  43.     
  44.     i = cycle->connection_n;
  45.     next = NULL;

  46.     do {
  47.         i--;
  48.         // 每个connection,对应一个read_events, 一个write_events
  49.         c[i].data = next;
  50.         c[i].read = &cycle->read_events[i];
  51.         c[i].write = &cycle->write_events[i];
  52.         c[i].fd = (ngx_socket_t) -1;

  53.         next = &c[i];

  54.     } while (i);

  55.     cycle->free_connections = next;//指向connection[0]
  56.     cycle->free_connection_n = cycle->connection_n; //free链接数
  57.     
  58.     // 给监听端口,分配资源
  59.     ls = cycle->listening.elts;
  60.     for (i = 0; i < cycle->listening.nelts; i++) {
  61.         //传入listen的fd,返回一个connection
  62.         c = ngx_get_connection(ls[i].fd, cycle->log);

  63.         if (c == NULL) {
  64.             return NGX_ERROR;
  65.         }

  66.         c->log = &ls[i].log;

  67.         c->listening = &ls[i];
  68.         ls[i].connection = c;

  69.         rev = c->read;

  70.         rev->log = c->log;
  71.         rev->accept = 1;
  72.         
  73.         
  74. #if (NGX_HAVE_DEFERRED_ACCEPT)
  75.         rev->deferred_accept = ls[i].deferred_accept;
  76. #endif
  77.         // 注册事件处理函数
  78.         rev->handler = ngx_event_accept;
  79.         // 若使用accept_mutex来解决惊群问题,直接返回
  80.         if (ngx_use_accept_mutex) {
  81.             continue;
  82.         }

  83.         if (ngx_event_flags & NGX_USE_RTSIG_EVENT) {
  84.             if (ngx_add_conn(c) == NGX_ERROR) {
  85.                 return NGX_ERROR;
  86.             }

  87.         } else {    // 将READ事件加入到epoll中
  88.             if (ngx_add_event(rev, NGX_READ_EVENT, 0) == NGX_ERROR) {
  89.                 return NGX_ERROR;
  90.             }
  91.         }
  92.     }

  93.     return NGX_OK;
  94. }

4. 从连接池中获取可用的连接资源

点击(此处)折叠或打开

  1. // 找到一个空闲的、可用的connection [不用去重新分配内存]
  2. ngx_connection_t* ngx_get_connection(ngx_socket_t s, ngx_log_t *log)
  3. {
  4.     ngx_uint_t instance;
  5.     ngx_event_t *rev, *wev;
  6.     ngx_connection_t *c;

  7.     // socket连接数太大
  8.     if (ngx_cycle->files && (ngx_uint_t) s >= ngx_cycle->files_n) {
  9.         ngx_log_error(NGX_LOG_ALERT, log, 0,
  10.                       "the new socket has number %d, "
  11.                       "but only %ui files are available",
  12.                       s, ngx_cycle->files_n);
  13.         return NULL;
  14.     }

  15.     c = ngx_cycle->free_connections;
  16.     // 当free_connection中没有可以用的connection时,扫描连接池,找到一个可以用的
  17.     if (c == NULL) {
  18.         ngx_drain_connections();
  19.         c = ngx_cycle->free_connections;
  20.     }

  21.     if (c == NULL) {
  22.         ngx_log_error(NGX_LOG_ALERT, log, 0,
  23.                       "%ui worker_connections are not enough",
  24.                       ngx_cycle->connection_n);

  25.         return NULL;
  26.     }

  27.     ngx_cycle->free_connections = c->data; // 指向下一个connections
  28.     ngx_cycle->free_connection_n--; // 空闲connection数-1

  29.     if (ngx_cycle->files) {
  30.         ngx_cycle->files[s] = c;
  31.     }

  32.     rev = c->read;
  33.     wev = c->write;

  34.     ngx_memzero(c, sizeof(ngx_connection_t));

  35.     c->read = rev;
  36.     c->write = wev;
  37.     c->fd = s;
  38.     c->log = log;

  39.     instance = rev->instance;

  40.     ngx_memzero(rev, sizeof(ngx_event_t));
  41.     ngx_memzero(wev, sizeof(ngx_event_t));

  42.     rev->instance = !instance;
  43.     wev->instance = !instance;

  44.     rev->index = NGX_INVALID_INDEX;
  45.     wev->index = NGX_INVALID_INDEX;

  46.     rev->data = c;
  47.     wev->data = c;

  48.     wev->write = 1;

  49.     return c;
  50. }

5. 当free_connections中没有可用的连接时,从reusable_connections_queue中找

点击(此处)折叠或打开

  1. static void ngx_drain_connections(void)
  2. {
  3.     ngx_int_t i;
  4.     ngx_queue_t *q;
  5.     ngx_connection_t *c;
  6.     // 从可复用的链接队列里面,获取connection.
  7.     for (i = 0; i < 32; i++) {
  8.         if (ngx_queue_empty(&ngx_cycle->reusable_connections_queue)) {
  9.             break;
  10.         }

  11.         q = ngx_queue_last(&ngx_cycle->reusable_connections_queue);
  12.         c = ngx_queue_data(q, ngx_connection_t, queue);

  13.         ngx_log_debug0(NGX_LOG_DEBUG_CORE, c->log, 0, "reusing connection");

  14.         c->close = 1; // 关闭该连接
  15.         c->read->handler(c->read);
  16.     }
  17. }

6. 接受客户端连接请求

点击(此处)折叠或打开

  1. // 当客户端请求连接时
  2. void ngx_event_accept(ngx_event_t *ev)
  3. {
  4.     ......
  5.     lc = ev->data;
  6.     ls = lc->listening;
  7.     ev->ready = 0;
  8.     // 接受客户端连接请求, accept
  9.     s = accept4(lc->fd, (struct sockaddr *) sa, &socklen, SOCK_NONBLOCK);
  10.     
  11.     // 获取一个空闲连接。通过socket ,获取到connect结构,这样该socket就和connection相关连
  12.     c = ngx_get_connection(s, ev->log);

  13.     if (c == NULL) {
  14.         if (ngx_close_socket(s) == -1) {
  15.             ngx_log_error(NGX_LOG_ALERT, ev->log, ngx_socket_errno, ngx_close_socket_n " failed");
  16.         }

  17.         return;
  18.     }
  19.     
  20.     c->pool = ngx_create_pool(ls->pool_size, ev->log);
  21.     if (c->pool == NULL) {
  22.         ngx_close_accepted_connection(c);
  23.         return;
  24.     }

  25.     c->sockaddr = ngx_palloc(c->pool, socklen);
  26.     if (c->sockaddr == NULL) {
  27.         ngx_close_accepted_connection(c);
  28.         return;
  29.     }
  30.     // 拷贝客户端socket信息
  31.     ngx_memcpy(c->sockaddr, sa, socklen);

  32.     log = ngx_palloc(c->pool, sizeof(ngx_log_t));
  33.     if (log == NULL) {
  34.         ngx_close_accepted_connection(c);
  35.         return;
  36.     }
  37.         
  38.     if (ngx_inherited_nonblocking) {
  39.         if (ngx_event_flags & NGX_USE_AIO_EVENT) {
  40.             if (ngx_blocking(s) == -1) { // aio,设置为阻塞模式
  41.                 ngx_log_error(NGX_LOG_ALERT, ev->log, ngx_socket_errno, ngx_blocking_n " failed");
  42.                 ngx_close_accepted_connection(c);
  43.                 return;
  44.             }
  45.         }

  46.     } else {
  47.         if (!(ngx_event_flags & (NGX_USE_AIO_EVENT|NGX_USE_RTSIG_EVENT))) {
  48.             if (ngx_nonblocking(s) == -1) { // 非aio,设置为非阻塞模式
  49.                 ngx_log_error(NGX_LOG_ALERT, ev->log, ngx_socket_errno, ngx_nonblocking_n " failed");
  50.                 ngx_close_accepted_connection(c);
  51.                 return;
  52.             }
  53.         }
  54.     }
  55.     
  56.     *log = ls->log;
  57.     // 初始化回调函数
  58.     c->recv = ngx_recv; // 初始化 recv函数
  59.     c->send = ngx_send; // 初始化 send函数
  60.     c->recv_chain = ngx_recv_chain;
  61.     c->send_chain = ngx_send_chain;

  62.     c->log = log;
  63.     c->pool->log = log;

  64.     c->socklen = socklen;
  65.     c->listening = ls;
  66.     c->local_sockaddr = ls->sockaddr;

  67.     c->unexpected_eof = 1;
  68.     ......
  69.     ls->handler(c); // ngx_http_init_connections
  70. }

7. 关闭connection

点击(此处)折叠或打开

  1. // 关闭connection
  2. void ngx_close_connection(ngx_connection_t *c)
  3. {
  4.     ngx_err_t err;
  5.     ngx_uint_t log_error, level;
  6.     ngx_socket_t fd;

  7.     if (c->fd == -1) {
  8.         ngx_log_error(NGX_LOG_ALERT, c->log, 0, "connection already closed");
  9.         return;
  10.     }
  11.     // del定时读
  12.     if (c->read->timer_set) {
  13.         ngx_del_timer(c->read);
  14.     }
  15.     // del定时写
  16.     if (c->write->timer_set) {
  17.         ngx_del_timer(c->write);
  18.     }

  19.     if (ngx_del_conn) {
  20.         ngx_del_conn(c, NGX_CLOSE_EVENT);

  21.     } else {
  22.         if (c->read->active || c->read->disabled) {
  23.             ngx_del_event(c->read, NGX_READ_EVENT, NGX_CLOSE_EVENT); // 从epoll中del读
  24.         }

  25.         if (c->write->active || c->write->disabled) {
  26.             ngx_del_event(c->write, NGX_WRITE_EVENT, NGX_CLOSE_EVENT);// 从epoll中del写
  27.         }
  28.     }
  29.     
  30.     if (c->read->prev) {
  31.         ngx_delete_posted_event(c->read); // del 延时读
  32.     }

  33.     if (c->write->prev) {
  34.         ngx_delete_posted_event(c->write); // del 延时写
  35.     }

  36.     c->read->closed = 1;
  37.     c->write->closed = 1;
  38.     
  39.     // 不放入到 reusable 队列
  40.     ngx_reusable_connection(c, 0);

  41.     log_error = c->log_error;
  42.     // 放入到 free_connections
  43.     ngx_free_connection(c);

  44.     fd = c->fd;
  45.     c->fd = (ngx_socket_t) -1;
  46.     // close socket
  47.     if (ngx_close_socket(fd) == -1) {
  48.         ... ...
  49.     }
  50. }


阅读(5502) | 评论(0) | 转发(0) |
0

上一篇:Nginx之三:惊群

下一篇:进程间通信

给主人留下些什么吧!~~