Chinaunix首页 | 论坛 | 博客
  • 博客访问: 31640
  • 博文数量: 2
  • 博客积分: 0
  • 博客等级: 民兵
  • 技术积分: 50
  • 用 户 组: 普通用户
  • 注册时间: 2013-08-06 15:47
文章分类

全部博文(2)

文章存档

2017年(1)

2013年(1)

我的朋友

分类: C/C++

2017-11-11 00:14:46

Nginx 线程实现分析


日期:2017-11

Nginx版本:1.13.6

注:如果在Nginx中使用线程目前并不是为增加性能,而是非用线程不可,那可能你的程序或架构还有优化的空间。

如果觉得有用,转载请自便

内容目录


1. 简介

线程池从1.7.11版本开始加入。

线程context切换会消耗点CPU及内存,在大量并发时,累计的消耗可能也比较可观,有些追求极致性能的人舍不得浪费掉,故线程不会成为他们的选择。

我曾遇到一个场景:某些请求的处理,要循环读取海量数据,很久才完成,当这样的请求多发几次,nginxworkers悉数要“粘”在这些读取操作上,短期无法再回到epoll_wait(),在客户端看来,服务器已经“不响应”了。

当然服务器架构或是数据库的设计固然是有问题,但遇到这种情形“阻塞”的情形,不得已需要异步处理,避免一些操作霸占着worker,线程池可以是合适的选择。

2. 配置指令

要使用线程池,编译时期configure时明确地加参数--with-threads来编译:


  1. ./configure --with-threads && make && make install
      配置指令在参考资料[2]中有介绍,原文介绍已经很明确完整了,本处只是搬运下:

指令语法:

  1. thread_pool name threads=number [max_queue=number]
      默认: thread_pool default threads=32 max_queue=65536;

配置位置:main

name 为线程池之名;

number指明线程池内线程的数目(nginx启动时在init worker阶段,就会创建number个线程);

max_queue:如果当前线程池内无空闲线程,任务就挂在此线程池队列中;此参数指定该等待队列中的任务的最大数量,超出此限制时,再添加任务会报错。


  原文开头有句话:Defines named thread pools used for multi-threaded reading and sending of files without blocking worker processes.

目前官方Nginx中似乎没什么模块在使用线程池。


  1. [root@localhost nginx_dev]# grep -nr ngx_thread_task_alloc nginx-1.13.6
  2. nginx-1.13.6/src/core/ngx_thread_pool.h:32:ngx_thread_task_t *ngx_thread_task_alloc(ngx_pool_t *pool, size_t size);
  3. nginx-1.13.6/src/core/ngx_thread_pool.c:215:ngx_thread_task_alloc(ngx_pool_t *pool, size_t size)
  4. nginx-1.13.6/src/os/unix/ngx_files.c:108: task = ngx_thread_task_alloc(pool, sizeof(ngx_thread_file_ctx_t));
  5. nginx-1.13.6/src/os/unix/ngx_files.c:490: task = ngx_thread_task_alloc(pool,
  6. nginx-1.13.6/src/os/unix/ngx_linux_sendfile_chain.c:328: task = ngx_thread_task_alloc(c->pool, sizeof(ngx_linux_sendfile_ctx_t));
   
   在文件读、写时可进行配置来使用。在需要使用aiolocation,配置:
  1. aio threads=;
       配置aio使用线程后,nginx的文件I/O能使用线程进行。

如果在自开发模块中使用线程池,继续往下。


3. Nginx模块中使用

3.1. 相关的基本结构


点击(此处)折叠或打开

  1. struct ngx_thread_task_s {
  2.     ngx_thread_task_t *next;
  3.     ngx_uint_t id;
  4.     void *ctx;
  5.     void (*handler)(void *data, ngx_log_t *log); // 线程任务
  6.     ngx_event_t event;
  7. };
  8. typedef struct ngx_thread_pool_s ngx_thread_pool_t;
     此是线程任务结构,定义在src/core/ngx_thread_pool.h文件中,用户需要设置的字段有:

ctx:此字段用户可自由定义,用于将数据从主线程传递给任务线程。

hander 这是个回调函数的指针,线程通过执行该函数来完成该任务!需要线程处理的工作,定义到此函数指针指向的回调函数即可。

线程执行任务是这样进行的:

  1. task->handler(task->ctx, tp->log);
 其中第二个参数中的tp是线程池。这个在使用线程池时不需关注。

event: 这个结构定义在src/event/ngx_event.h, 此结构字段比较多,只复制一定会需要(跟踪调试时发现会改动的)的几个:


点击(此处)折叠或打开

  1. typedef struct ngx_event_s ngx_event_t;
  2. struct ngx_event_s {
  3.     ...
  4.     unsigned active:1;
  5.     unsigned complete:1;
  6.     ngx_event_handler_pt handler;
  7.     ....
  8. };
       其中ngx_event_handler_pt也是声明handler为函数指针

其定义在src/core/ngx_core.h文件,定义如下:

  1. typedef void (*ngx_event_handler_pt)(ngx_event_t *ev);
         activecomplete标志任务当前状态,thread pool模块设置的,用户需要时,读即可,不要去写。

active == 1表示任务已经提交并在等候或正在处理。active == 0表示任务没在处理了;

complete == 1表示任务已经完成。

此处的handler是任务被线程执行完成后,主线程重新接管时,执行的回调函数指针,主线程是这样回调的:


点击(此处)折叠或打开

  1. ...
  2. event = &task->event;
  3. event->handler(event);
  4. ...
3.2. 生成任务结构

使用:

  1. ngx_thread_task_t * ngx_thread_task_alloc(ngx_pool_t *pool, size_t size)
     分配线程任务结构。size是可以自定义线程context使用的一些结构,此函数为任务结构及ctx分配内存

成功返回后:

taskngx_thread_task_t.ctx 会指向该内存区域。

3.1.节的介绍填充需要的字段。

3.3. 获取线程池

thread_pool指令的第一个参数是线程池的名字,就是使用名称来获取参数的。

  1. ngx_thread_pool_t * ngx_thread_pool_get(ngx_cycle_t *cycle, ngx_str_t *name)
  该函数就是从Nginx中找出以name为名的线程池。


3.4. 提交任务到线程池

  1. ngx_int_t ngx_thread_task_post(ngx_thread_pool_t *tp, ngx_thread_task_t *task)
        tp3.3.节获取到的线程池。

task为任务3.2.节分配到的结构。

该函数主要的操作:

1task->active = 1

2)将task挂到tp的等待的任务队列中;

3)使用条件变量唤醒线程池内候命的线程;


任务线程通过:

ngx_epoll_notify(ngx_event_handler_pt handler)

通知主线程任务已经完成。

主线程从epoll_wait()中出来,执行。

提交任务后,坐等线程处理完成即可。

3.5. 处理任务的线程执行过程

1)从等待队列头取下一个任务task

2)执行任务:task->handler(task->ctx, tp->log);

3)完成任务后,向worker进程(主线程)的某个已经位于epoll中的专用于线程池的fd写数据;

4)继续下一任务

3.6. 主线程回归接管执行过程

1)主线程从epoll_wait()中被唤醒;

2)之后,从完成的任务队列中一次取下所有的完成的任务:

依次如下执行:


点击(此处)折叠或打开

  1. event = &task->event;
  2. event->complete = 1;
  3. event->active = 0;
  4. event->handler(event);

4. 可能出现的副作用

除了已经提到的CPU切换线程需要消耗一定时间外,可能还有个副作用,如果像本文档第6节的示例那般,一整个请求都使用线程去处理,将会限制并发数量!

在介绍thread_pool指令时,max_queue的参数限制了等待处理的任务数量上限,上限到达后,在任何任务完成前,不能再加入新的任务。间接说明:如果所有请求全程处理都使用线程,当等待处理的请求数量达到max_queue设定的数值时,后续的请求会因为不能被处理而报错。

还是开篇那句话,如果使用线程不是为了提高性能,而是非用不可,则程序的设计或服务器的架构可能还有优化的空间。

5. 参考资料

[1] https://www.nginx.com/blog/thread-pools-boost-performance-9x/

[2]


6. 代码示例

// file: ngx_http_block_request_module.c

点击(此处)折叠或打开

  1. #include <ngx_config.h>
  2. #include <ngx_http.h>
  3. #include <ngx_core.h>

  4. static ngx_str_t blk_tp_name = ngx_string("blk_thread_pool"); // 线程池之名
  5. // 定义一个context结构,使用它从主线程传递参数到任务线程中
  6. // 可按需自由定义
  7. typedef struct {
  8.     ngx_http_request_t *r;
  9.     ngx_fd_t fd;
  10.     ngx_buf_t * buf;
  11.     ngx_int_t err;
  12. } ngx_http_block_request_ctx_t;

  13. static ngx_int_t ngx_http_block_request_response(ngx_http_request_t *r, ngx_buf_t *buf);
  14. static void ngx_http_block_request_thread_event_handler(ngx_event_t *ev);
  15. static void ngx_http_block_request_thread_handler(void *data, ngx_log_t *log);
  16. static ngx_int_t ngx_http_block_request_handler(ngx_http_request_t *r);
  17. static char * ngx_http_block_request(ngx_conf_t *cf, ngx_command_t *cmd, void *conf);

  18. ngx_command_t ngx_http_block_request_commands [] = {
  19.     {
  20.         .name = ngx_string("block_request"),
  21.         .type = NGX_HTTP_LOC_CONF | NGX_CONF_NOARGS,
  22.         .set = ngx_http_block_request,
  23.         .conf = 0,
  24.         .offset = 0,
  25.         .post = NULL
  26.     },
  27.     ngx_null_command
  28. };

  29. static ngx_http_module_t ngx_http_block_request_module_ctx = {
  30.     .preconfiguration = NULL,
  31.     .postconfiguration = NULL,

  32.     .create_main_conf = NULL,
  33.     .init_main_conf = NULL,

  34.     .create_srv_conf = NULL,
  35.     .merge_srv_conf = NULL,

  36.     .create_loc_conf = NULL,
  37.     .merge_loc_conf = NULL
  38. };

  39. ngx_module_t ngx_http_block_request_module = {
  40.     NGX_MODULE_V1,
  41.     &ngx_http_block_request_module_ctx,
  42.     ngx_http_block_request_commands,
  43.     NGX_HTTP_MODULE,
  44.     NULL,
  45.     NULL,
  46.     NULL,
  47.     NULL,
  48.     NULL,
  49.     NULL,
  50.     NULL,
  51.     NGX_MODULE_V1_PADDING
  52. };

  53. static char * ngx_http_block_request(ngx_conf_t *cf, ngx_command_t *cmd, void *conf)
  54. {
  55.     ngx_http_core_loc_conf_t * clcf = ngx_http_conf_get_module_loc_conf(cf, ngx_http_core_module);
  56.     clcf->handler = ngx_http_block_request_handler;

  57.     return NGX_CONF_OK;
  58. }
  59. static ngx_int_t ngx_http_block_request_handler(ngx_http_request_t *r)
  60. {
  61.     ngx_thread_pool_t * tp;
  62.     ngx_thread_task_t * task;
  63.     ngx_http_block_request_ctx_t *ctx;
  64.     ngx_buf_t *b;

  65.     /* ngx_cycle 是全局变量 */
  66.     // 取出nginx.conf中thread_pool定义的名为blk_tp_name的线程池
  67.     tp = ngx_thread_pool_get((ngx_cycle_t *)ngx_cycle, &blk_tp_name);
  68.     if (!tp)
  69.     {
  70.         return NGX_ERROR;
  71.     }
  72.     // 分配一个任务结构,多分配的sizeof(*ctx)即是开头定义的ctx结构,用于在主线程与任务线程
  73.     // 间传递参数,此函数成功执行后 task->ctx 即指向多分配的 sizeof(*ctx) 内在区。
  74.     task = ngx_thread_task_alloc(r->pool, sizeof(*ctx));
  75.     if (!task)
  76.     {
  77.         return NGX_ERROR;
  78.     }
  79.     ctx = task->ctx;
  80.     // 字段按需自行填充,此处仅是示例
  81.     task->handler = ngx_http_block_request_thread_handler; // 线程会执行任务
  82.     task->event.data = ctx; // 跟踪调试未发现data被Nginx需要,所以暂存个数据,主线程回归后能够使用
  83.     task->event.handler = ngx_http_block_request_thread_event_handler; // 任务被线程处理完成后,主线程回调的函数

  84.     ctx->r = r;
  85.     ctx->fd = -1; // test
  86.     b = ngx_pcalloc(r->pool, sizeof(ngx_buf_t));
  87.     if (!b)
  88.     {
  89.         return NGX_ERROR;
  90.     }
  91.     ctx->buf = b;

  92.     if (ngx_thread_task_post(tp, task) != NGX_OK)
  93.     {
  94.         return NGX_ERROR;
  95.     }
  96.     // 让nginx 暂时不要关闭连接,线程处理任务完成后,还要使用该连接响应数据给客户端
  97.     r->connection->buffered |= NGX_HTTP_WRITE_BUFFERED;
  98.     r->connection->write->delayed = 1;

  99.     return NGX_AGAIN;
  100.  }

  101. /* 任务线程去执行的handler,需要线程处理的任务定义于此 */
  102. static void ngx_http_block_request_thread_handler(void *data, ngx_log_t *log)
  103. {
  104.     ngx_http_block_request_ctx_t *ctx = data; // data即task->ctx
  105.     ngx_buf_t *b;
  106.     b = ctx->buf;
  107.     ngx_int_t i = 7;
  108.     b->start = ngx_pcalloc(ctx->r->pool, 120);
  109.     if (b->start == NULL)
  110.     {
  111.         ctx->err = NGX_ERROR;
  112.         return ;
  113.     }
  114.     b->end = (u_char *)(b->pos) + 120;
  115.     b->pos = b->start;
  116.     b->last = b->pos;
  117.     while (i > 0)
  118.     {
  119.         ngx_log_debug0(NGX_LOG_DEBUG_CORE, log, 0, "ngx_http_block_request_thread_handler");
  120.         ngx_sleep (1);
  121.         i--;
  122.         *(b->last) = 'a'; // 这个函数做的工作就是每秒产生一个字符a
  123.         b->last = (u_char *)(b->last) + 1;
  124.     }
  125.     b->memory = 1;
  126.     b->last_buf = 1;
  127.     ctx->err = NGX_OK;
  128. }

  129. // 处理完成之后, 主线程接管后, 会回调的函数
  130. static void ngx_http_block_request_thread_event_handler(ngx_event_t *ev)
  131. {
  132.     ngx_http_block_request_ctx_t *ctx = ev->data; // ev即task->event->data,自行设置的值
  133.     ngx_http_request_t *r = ctx->r;

  134.     // 完成后,记得清理这些设置,不然,Nginx仍然当作还有数据要处理而继续等待,
  135.     // 不去响应客户端
  136.     r->connection->buffered &= ~NGX_HTTP_WRITE_BUFFERED;
  137.     r->connection->write->delayed = 0;
  138.     // 由于线程的加入,扰乱了原有的执行流程,这里需要
  139.     // ngx_http_finalize_request()
  140.     // 来让服务器主动关闭连接
  141.     ngx_http_finalize_request(r, ngx_http_block_request_response(r, ctx->buf));
  142. }

  143. static ngx_int_t ngx_http_block_request_response(ngx_http_request_t *r, ngx_buf_t *buf)
  144. {
  145.     ngx_chain_t out;

  146.     r->headers_out.status = NGX_HTTP_OK;
  147.     r->headers_out.content_type.len = sizeof("text/html") - 1;
  148.     r->headers_out.content_type.data = (u_char *) "text/html";
  149.     r->headers_out.content_length_n = (u_char *)(buf->last) - (u_char *)(buf->pos);

  150.     ngx_http_send_header(r);

  151.     out.buf = buf;
  152.     out.next = NULL;
  153.     buf->memory = 1;
  154.     buf->last_buf = 1;
  155.     return ngx_http_output_filter(r, &out);
  156. }


// file: config

点击(此处)折叠或打开

  1. ngx_addon_name="ngx_http_block_request_module"
  2. HTTP_MODULES="$HTTP_MODULES ngx_http_block_request_module"
  3. NGX_ADDON_SRCS="$NGX_ADDON_SRCS $ngx_addon_dir/ngx_http_block_request_module.c"
// file: cong/nginx.conf的相关配置(片段)

点击(此处)折叠或打开

  1. ...
  2. thread_pool blk_thread_pool threads=1; # 定义线程池
  3. ...
  4.    location / {
  5.         root html;
  6.         block_request ;
  7.         index index.html index.htm;
  8.    }
  9. ...


阅读(1473) | 评论(1) | 转发(0) |
给主人留下些什么吧!~~

tseesing2017-11-11 00:20:44

更改第2节,应该是搜索 ngx_thread_pool_get 更贴切:
[root@localhost nginx_dev]# grep -nr ngx_thread_pool_get ./nginx-1.13.6
./nginx-1.13.6/src/core/ngx_thread_pool.h:30:ngx_thread_pool_t *ngx_thread_pool_get(ngx_cycle_t *cycle, ngx_str_t *name);
./nginx-1.13.6/src/core/ngx_thread_pool.c:530:    tp = ngx_thread_pool_get(cf->cycle, name); 
./nginx-1.13.6/src/core/ngx_thread_pool.c:562:ngx_thread_pool_get(ngx_cycle_t *cycle, ngx_str_t *name)
./nginx-1.13.6/src/http/ngx_http_f