Nginx 线程实现分析
日期:2017-11
Nginx版本:1.13.6
注:如果在Nginx中使用线程目前并不是为增加性能,而是非用线程不可,那可能你的程序或架构还有优化的空间。
如果觉得有用,转载请自便
1. 简介
线程池从1.7.11版本开始加入。
线程context切换会消耗点CPU及内存,在大量并发时,累计的消耗可能也比较可观,有些追求极致性能的人舍不得浪费掉,故线程不会成为他们的选择。
我曾遇到一个场景:某些请求的处理,要循环读取海量数据,很久才完成,当这样的请求多发几次,nginx的workers悉数要“粘”在这些读取操作上,短期无法再回到epoll_wait(),在客户端看来,服务器已经“不响应”了。
当然服务器架构或是数据库的设计固然是有问题,但遇到这种情形“阻塞”的情形,不得已需要异步处理,避免一些操作霸占着worker,线程池可以是合适的选择。
2. 配置指令
要使用线程池,编译时期configure时明确地加参数--with-threads来编译:
-
./configure --with-threads && make && make install
配置指令在参考资料
[2]中有介绍,原文介绍已经很明确完整了,本处只是搬运下:
指令语法:
-
thread_pool name threads=number [max_queue=number]
默认:
thread_pool default threads=32 max_queue=65536;
配置位置:main
name 为线程池之名;
number指明线程池内线程的数目(nginx启动时在init
worker阶段,就会创建number个线程);
max_queue:如果当前线程池内无空闲线程,任务就挂在此线程池队列中;此参数指定该等待队列中的任务的最大数量,超出此限制时,再添加任务会报错。
原文开头有句话:Defines
named thread pools used for multi-threaded
reading and sending of files without blocking worker
processes.
目前官方Nginx中似乎没什么模块在使用线程池。
-
[root@localhost nginx_dev]# grep -nr ngx_thread_task_alloc nginx-1.13.6
-
nginx-1.13.6/src/core/ngx_thread_pool.h:32:ngx_thread_task_t *ngx_thread_task_alloc(ngx_pool_t *pool, size_t size);
-
nginx-1.13.6/src/core/ngx_thread_pool.c:215:ngx_thread_task_alloc(ngx_pool_t *pool, size_t size)
-
nginx-1.13.6/src/os/unix/ngx_files.c:108: task = ngx_thread_task_alloc(pool, sizeof(ngx_thread_file_ctx_t));
-
nginx-1.13.6/src/os/unix/ngx_files.c:490: task = ngx_thread_task_alloc(pool,
-
nginx-1.13.6/src/os/unix/ngx_linux_sendfile_chain.c:328: task = ngx_thread_task_alloc(c->pool, sizeof(ngx_linux_sendfile_ctx_t));
在文件读、写时可进行配置来使用。在需要使用
aio的
location,配置
:
配置
aio使用线程后,
nginx的文件
I/O能使用线程进行。
如果在自开发模块中使用线程池,继续往下。
3. 在Nginx模块中使用
3.1. 相关的基本结构
-
struct ngx_thread_task_s {
-
ngx_thread_task_t *next;
-
ngx_uint_t id;
-
void *ctx;
-
void (*handler)(void *data, ngx_log_t *log); // 线程任务
-
ngx_event_t event;
-
};
-
typedef struct ngx_thread_pool_s ngx_thread_pool_t;
此是线程任务结构,定义在
src/core/ngx_thread_pool.h文件中,用户需要设置的字段有:
ctx:此字段用户可自由定义,用于将数据从主线程传递给任务线程。
hander: 这是个回调函数的指针,线程通过执行该函数来完成该任务!需要线程处理的工作,定义到此函数指针指向的回调函数即可。
线程执行任务是这样进行的:
-
task->handler(task->ctx, tp->log);
其中第二个参数中的
tp是线程池。这个在使用线程池时不需关注。
event: 这个结构定义在src/event/ngx_event.h, 此结构字段比较多,只复制一定会需要(跟踪调试时发现会改动的)的几个:
-
typedef struct ngx_event_s ngx_event_t;
-
struct ngx_event_s {
-
...
-
unsigned active:1;
-
unsigned complete:1;
-
ngx_event_handler_pt handler;
-
....
-
};
其中
ngx_event_handler_pt也是声明
handler为函数指针
其定义在src/core/ngx_core.h文件,定义如下:
-
typedef void (*ngx_event_handler_pt)(ngx_event_t *ev);
active及
complete标志任务当前状态,
thread
pool模块设置的,用户需要时,读即可,不要去写。
active
== 1表示任务已经提交并在等候或正在处理。active
== 0表示任务没在处理了;
complete
== 1表示任务已经完成。
此处的handler是任务被线程执行完成后,主线程重新接管时,执行的回调函数指针,主线程是这样回调的:
-
...
-
event = &task->event;
-
event->handler(event);
-
...
3.2. 生成任务结构
使用:
-
ngx_thread_task_t * ngx_thread_task_alloc(ngx_pool_t *pool, size_t size) ;
分配线程任务结构。
size是可以自定义线程
context使用的一些结构,此函数为任务结构及
ctx分配内存
成功返回后:
taskngx_thread_task_t.ctx 会指向该内存区域。
按3.1.节的介绍填充需要的字段。
3.3. 获取线程池
thread_pool指令的第一个参数是线程池的名字,就是使用名称来获取参数的。
-
ngx_thread_pool_t * ngx_thread_pool_get(ngx_cycle_t *cycle, ngx_str_t *name)
该函数就是从
Nginx中找出以
name为名的线程池。
3.4. 提交任务到线程池
-
ngx_int_t ngx_thread_task_post(ngx_thread_pool_t *tp, ngx_thread_task_t *task)
tp为
3.3.节获取到的线程池。
task为任务3.2.节分配到的结构。
该函数主要的操作:
1)task->active
= 1;
2)将task挂到tp的等待的任务队列中;
3)使用条件变量唤醒线程池内候命的线程;
任务线程通过:
ngx_epoll_notify(ngx_event_handler_pt
handler)
通知主线程任务已经完成。
主线程从epoll_wait()中出来,执行。
提交任务后,坐等线程处理完成即可。
3.5. 处理任务的线程执行过程
1)从等待队列头取下一个任务task;
2)执行任务:task->handler(task->ctx,
tp->log);
3)完成任务后,向worker进程(主线程)的某个已经位于epoll中的专用于线程池的fd写数据;
4)继续下一任务
3.6. 主线程回归接管执行过程
1)主线程从epoll_wait()中被唤醒;
2)之后,从完成的任务队列中一次取下所有的完成的任务:
依次如下执行:
-
event = &task->event;
-
event->complete = 1;
-
event->active = 0;
-
event->handler(event);
4. 可能出现的副作用
除了已经提到的CPU切换线程需要消耗一定时间外,可能还有个副作用,如果像本文档第6节的示例那般,一整个请求都使用线程去处理,将会限制并发数量!
在介绍thread_pool指令时,max_queue的参数限制了等待处理的任务数量上限,上限到达后,在任何任务完成前,不能再加入新的任务。间接说明:如果所有请求全程处理都使用线程,当等待处理的请求数量达到max_queue设定的数值时,后续的请求会因为不能被处理而报错。
还是开篇那句话,如果使用线程不是为了提高性能,而是非用不可,则程序的设计或服务器的架构可能还有优化的空间。
5. 参考资料
[1] https://www.nginx.com/blog/thread-pools-boost-performance-9x/
[2]
6. 代码示例
//
file: ngx_http_block_request_module.c
-
#include <ngx_config.h>
-
#include <ngx_http.h>
-
#include <ngx_core.h>
-
-
static ngx_str_t blk_tp_name = ngx_string("blk_thread_pool"); // 线程池之名
-
// 定义一个context结构,使用它从主线程传递参数到任务线程中
-
// 可按需自由定义
-
typedef struct {
-
ngx_http_request_t *r;
-
ngx_fd_t fd;
-
ngx_buf_t * buf;
-
ngx_int_t err;
-
} ngx_http_block_request_ctx_t;
-
-
static ngx_int_t ngx_http_block_request_response(ngx_http_request_t *r, ngx_buf_t *buf);
-
static void ngx_http_block_request_thread_event_handler(ngx_event_t *ev);
-
static void ngx_http_block_request_thread_handler(void *data, ngx_log_t *log);
-
static ngx_int_t ngx_http_block_request_handler(ngx_http_request_t *r);
-
static char * ngx_http_block_request(ngx_conf_t *cf, ngx_command_t *cmd, void *conf);
-
-
ngx_command_t ngx_http_block_request_commands [] = {
-
{
-
.name = ngx_string("block_request"),
-
.type = NGX_HTTP_LOC_CONF | NGX_CONF_NOARGS,
-
.set = ngx_http_block_request,
-
.conf = 0,
-
.offset = 0,
-
.post = NULL
-
},
-
ngx_null_command
-
};
-
-
static ngx_http_module_t ngx_http_block_request_module_ctx = {
-
.preconfiguration = NULL,
-
.postconfiguration = NULL,
-
-
.create_main_conf = NULL,
-
.init_main_conf = NULL,
-
-
.create_srv_conf = NULL,
-
.merge_srv_conf = NULL,
-
-
.create_loc_conf = NULL,
-
.merge_loc_conf = NULL
-
};
-
-
ngx_module_t ngx_http_block_request_module = {
-
NGX_MODULE_V1,
-
&ngx_http_block_request_module_ctx,
-
ngx_http_block_request_commands,
-
NGX_HTTP_MODULE,
-
NULL,
-
NULL,
-
NULL,
-
NULL,
-
NULL,
-
NULL,
-
NULL,
-
NGX_MODULE_V1_PADDING
-
};
-
-
static char * ngx_http_block_request(ngx_conf_t *cf, ngx_command_t *cmd, void *conf)
-
{
-
ngx_http_core_loc_conf_t * clcf = ngx_http_conf_get_module_loc_conf(cf, ngx_http_core_module);
-
clcf->handler = ngx_http_block_request_handler;
-
-
return NGX_CONF_OK;
-
}
-
static ngx_int_t ngx_http_block_request_handler(ngx_http_request_t *r)
-
{
-
ngx_thread_pool_t * tp;
-
ngx_thread_task_t * task;
-
ngx_http_block_request_ctx_t *ctx;
-
ngx_buf_t *b;
-
-
/* ngx_cycle 是全局变量 */
-
// 取出nginx.conf中thread_pool定义的名为blk_tp_name的线程池
-
tp = ngx_thread_pool_get((ngx_cycle_t *)ngx_cycle, &blk_tp_name);
-
if (!tp)
-
{
-
return NGX_ERROR;
-
}
-
// 分配一个任务结构,多分配的sizeof(*ctx)即是开头定义的ctx结构,用于在主线程与任务线程
-
// 间传递参数,此函数成功执行后 task->ctx 即指向多分配的 sizeof(*ctx) 内在区。
-
task = ngx_thread_task_alloc(r->pool, sizeof(*ctx));
-
if (!task)
-
{
-
return NGX_ERROR;
-
}
-
ctx = task->ctx;
-
// 字段按需自行填充,此处仅是示例
-
task->handler = ngx_http_block_request_thread_handler; // 线程会执行任务
-
task->event.data = ctx; // 跟踪调试未发现data被Nginx需要,所以暂存个数据,主线程回归后能够使用
-
task->event.handler = ngx_http_block_request_thread_event_handler; // 任务被线程处理完成后,主线程回调的函数
-
-
ctx->r = r;
-
ctx->fd = -1; // test
-
b = ngx_pcalloc(r->pool, sizeof(ngx_buf_t));
-
if (!b)
-
{
-
return NGX_ERROR;
-
}
-
ctx->buf = b;
-
-
if (ngx_thread_task_post(tp, task) != NGX_OK)
-
{
-
return NGX_ERROR;
-
}
-
// 让nginx 暂时不要关闭连接,线程处理任务完成后,还要使用该连接响应数据给客户端
-
r->connection->buffered |= NGX_HTTP_WRITE_BUFFERED;
-
r->connection->write->delayed = 1;
-
-
return NGX_AGAIN;
-
}
-
-
/* 任务线程去执行的handler,需要线程处理的任务定义于此 */
-
static void ngx_http_block_request_thread_handler(void *data, ngx_log_t *log)
-
{
-
ngx_http_block_request_ctx_t *ctx = data; // data即task->ctx
-
ngx_buf_t *b;
-
b = ctx->buf;
-
ngx_int_t i = 7;
-
b->start = ngx_pcalloc(ctx->r->pool, 120);
-
if (b->start == NULL)
-
{
-
ctx->err = NGX_ERROR;
-
return ;
-
}
-
b->end = (u_char *)(b->pos) + 120;
-
b->pos = b->start;
-
b->last = b->pos;
-
while (i > 0)
-
{
-
ngx_log_debug0(NGX_LOG_DEBUG_CORE, log, 0, "ngx_http_block_request_thread_handler");
-
ngx_sleep (1);
-
i--;
-
*(b->last) = 'a'; // 这个函数做的工作就是每秒产生一个字符a
-
b->last = (u_char *)(b->last) + 1;
-
}
-
b->memory = 1;
-
b->last_buf = 1;
-
ctx->err = NGX_OK;
-
}
-
-
// 处理完成之后, 主线程接管后, 会回调的函数
-
static void ngx_http_block_request_thread_event_handler(ngx_event_t *ev)
-
{
-
ngx_http_block_request_ctx_t *ctx = ev->data; // ev即task->event->data,自行设置的值
-
ngx_http_request_t *r = ctx->r;
-
-
// 完成后,记得清理这些设置,不然,Nginx仍然当作还有数据要处理而继续等待,
-
// 不去响应客户端
-
r->connection->buffered &= ~NGX_HTTP_WRITE_BUFFERED;
-
r->connection->write->delayed = 0;
-
// 由于线程的加入,扰乱了原有的执行流程,这里需要
-
// ngx_http_finalize_request()
-
// 来让服务器主动关闭连接
-
ngx_http_finalize_request(r, ngx_http_block_request_response(r, ctx->buf));
-
}
-
-
static ngx_int_t ngx_http_block_request_response(ngx_http_request_t *r, ngx_buf_t *buf)
-
{
-
ngx_chain_t out;
-
-
r->headers_out.status = NGX_HTTP_OK;
-
r->headers_out.content_type.len = sizeof("text/html") - 1;
-
r->headers_out.content_type.data = (u_char *) "text/html";
-
r->headers_out.content_length_n = (u_char *)(buf->last) - (u_char *)(buf->pos);
-
-
ngx_http_send_header(r);
-
-
out.buf = buf;
-
out.next = NULL;
-
buf->memory = 1;
-
buf->last_buf = 1;
-
return ngx_http_output_filter(r, &out);
-
}
//
file: config
-
ngx_addon_name="ngx_http_block_request_module"
-
HTTP_MODULES="$HTTP_MODULES ngx_http_block_request_module"
-
NGX_ADDON_SRCS="$NGX_ADDON_SRCS $ngx_addon_dir/ngx_http_block_request_module.c"
//
file: cong/nginx.conf的相关配置(片段)
-
...
-
thread_pool blk_thread_pool threads=1; # 定义线程池
-
...
-
location / {
-
root html;
-
block_request ;
-
index index.html index.htm;
-
}
-
...
阅读(1513) | 评论(1) | 转发(0) |