随笔拿一个nginx的filter模块来看,gzip模块,来看它的初始化。
- static ngx_http_output_header_filter_pt ngx_http_next_header_filter;
- static ngx_http_output_body_filter_pt ngx_http_next_body_filter;
- static ngx_int_t
- ngx_http_gzip_filter_init(ngx_conf_t *cf)
- {
- ngx_http_next_header_filter = ngx_http_top_header_filter;
- ngx_http_top_header_filter = ngx_http_gzip_header_filter;
- ngx_http_next_body_filter = ngx_http_top_body_filter;
- ngx_http_top_body_filter = ngx_http_gzip_body_filter;
- return NGX_OK;
- }
static ngx_http_output_header_filter_pt ngx_http_next_header_filter;
static ngx_http_output_body_filter_pt ngx_http_next_body_filter;
static ngx_int_t
ngx_http_gzip_filter_init(ngx_conf_t *cf)
{
ngx_http_next_header_filter = ngx_http_top_header_filter;
ngx_http_top_header_filter = ngx_http_gzip_header_filter;
ngx_http_next_body_filter = ngx_http_top_body_filter;
ngx_http_top_body_filter = ngx_http_gzip_body_filter;
return NGX_OK;
}
这里nginx处理filter将所有的过滤器做成一个类似链表的东东,每次声明一个ngx_http_next_header_filter以及ngx_http_next_body_filter来保存当前的最前面的filter,然后再将自己的filter处理函数赋值给ngx_http_top_header_filter以及ngx_http_top_body_filter ,这样也就是说最后面初始化的filter反而是最早处理。
而在模块本身的filter处理函数中会调用ngx_http_next_header_filter,也就是当前filter插入前的那个最top上的filter处理函数。
然后我们来看nginx如何启动filter的调用。
先来看head_filter的调用:
- ngx_int_t
- ngx_http_send_header(ngx_http_request_t *r)
- {
- if (r->err_status) {
- r->headers_out.status = r->err_status;
- r->headers_out.status_line.len = 0;
- }
- return ngx_http_top_header_filter(r);
- }
ngx_int_t
ngx_http_send_header(ngx_http_request_t *r)
{
if (r->err_status) {
r->headers_out.status = r->err_status;
r->headers_out.status_line.len = 0;
}
return ngx_http_top_header_filter(r);
}
可以看到当发送header的时候就是调用ngx_http_top_header_filter,nginx这里把status这些也作为一个filter模块来处理的。当启动ngx_http_top_header_filter之后所有的filter处理函数就会象链表一样被一个个的调用。
然后是body filter的调用,这个和header的类似,因此就不解释了。
- ngx_int_t
- ngx_http_output_filter(ngx_http_request_t *r, ngx_chain_t *in)
- {
- ngx_int_t rc;
- ngx_connection_t *c;
- c = r->connection;
- ngx_log_debug2(NGX_LOG_DEBUG_HTTP, c->log, 0,
- "http output filter \"%V?%V\"", &r->uri, &r->args);
- rc = ngx_http_top_body_filter(r, in);
- ..............................................................
- return rc;
- }
ngx_int_t
ngx_http_output_filter(ngx_http_request_t *r, ngx_chain_t *in)
{
ngx_int_t rc;
ngx_connection_t *c;
c = r->connection;
ngx_log_debug2(NGX_LOG_DEBUG_HTTP, c->log, 0,
"http output filter \"%V?%V\"", &r->uri, &r->args);
//启动body filter。
rc = ngx_http_top_body_filter(r, in);
..............................................................
return rc;
}
这里还有一个问题,那就是最后一个ngx_http_top_header_filter和ngx_http_top_body_filter是什么呢?也就是第一个被插入的filter。
先来看filter被初始化的地方。这里filter的初始化是在ngx_http_block函数中:
- for (m = 0; ngx_modules[m]; m++) {
- if (ngx_modules[m]->type != NGX_HTTP_MODULE) {
- continue;
- }
- module = ngx_modules[m]->ctx;
- if (module->postconfiguration) {
- if (module->postconfiguration(cf) != NGX_OK) {
- return NGX_CONF_ERROR;
- }
- }
- }
for (m = 0; ngx_modules[m]; m++) {
if (ngx_modules[m]->type != NGX_HTTP_MODULE) {
continue;
}
module = ngx_modules[m]->ctx;
//如果存在postconfiguratio则调用初始化。
if (module->postconfiguration) {
if (module->postconfiguration(cf) != NGX_OK) {
return NGX_CONF_ERROR;
}
}
}
代码很简单就是遍历ngx_modules然后调用初始化函数,而我们这里要找第一个filter,也就是ngx_modules中的第一个bodyfilter和header filter。
来看objs/ngx_modules.c中的ngx_module的定义:
- ngx_module_t *ngx_modules[] = {
- ..............................................................
- &ngx_http_write_filter_module,
- &ngx_http_header_filter_module,
- ........................................................................
- NULL
- };
ngx_module_t *ngx_modules[] = {
..............................................................
&ngx_http_write_filter_module,
&ngx_http_header_filter_module,
........................................................................
NULL
};
可以看到ngx_http_write_filter_module和ngx_http_header_filter_module分别是body filter和header filter的第一个初始化模块,也就是filter链中的最后一个模块。
接下来我们就来详细分析这两个模块,首先是ngx_http_write_filter_module模块。
这个模块的功能起始很简单,就是遍历chain,然后输出所有的数据,如果有设置flush的话刷新chain。
这里要注意ngx_http_request_t中有一个out的chain,这个chain保存的是上一次还没有被发完的buf,这样每次我们接收到新的chain的话,就需要将新的chain连接到老的out chain上,然后再发出去。
来看代码。
- ngx_int_t
- ngx_http_write_filter(ngx_http_request_t *r, ngx_chain_t *in)
ngx_int_t
ngx_http_write_filter(ngx_http_request_t *r, ngx_chain_t *in)
第一个是request请求,第二个参数是输入的chain。
先来看初始化部分:
- off_t size, sent, nsent, limit;
- ngx_uint_t last, flush;
- ngx_msec_t delay;
- ngx_chain_t *cl, *ln, **ll, *chain;
- ngx_connection_t *c;
- ngx_http_core_loc_conf_t *clcf;
- c = r->connection;
- if (c->error) {
- return NGX_ERROR;
- }
- size = 0;
- flush = 0;
- last = 0;
- ll = &r->out;
off_t size, sent, nsent, limit;
ngx_uint_t last, flush;
ngx_msec_t delay;
ngx_chain_t *cl, *ln, **ll, *chain;
ngx_connection_t *c;
ngx_http_core_loc_conf_t *clcf;
//得到当前所属的连接
c = r->connection;
if (c->error) {
return NGX_ERROR;
}
size = 0;
flush = 0;
last = 0;
//得到上次没有发送完毕的chain
ll = &r->out;
然后接下来这部分是校验并统计out chain,也就是上次没有完成的chain buf。
- for (cl = r->out; cl; cl = cl->next) {
- ll = &cl->next;
- #if 1
- if (ngx_buf_size(cl->buf) == 0 && !ngx_buf_special(cl->buf)) {
- ......................................................................
- ngx_debug_point();
- return NGX_ERROR;
- }
- #endif
- size += ngx_buf_size(cl->buf);
- if (cl->buf->flush || cl->buf->recycled) {
- flush = 1;
- }
- if (cl->buf->last_buf) {
- last = 1;
- }
- }
for (cl = r->out; cl; cl = cl->next) {
ll = &cl->next;
#if 1
//如果有0长度的buf则返回错误。
if (ngx_buf_size(cl->buf) == 0 && !ngx_buf_special(cl->buf)) {
......................................................................
ngx_debug_point();
return NGX_ERROR;
}
#endif
//得到buf的大小
size += ngx_buf_size(cl->buf);
//看当传输完毕后是否要刷新buf。
if (cl->buf->flush || cl->buf->recycled) {
flush = 1;
}
//看是否是最后一个buf
if (cl->buf->last_buf) {
last = 1;
}
}
接下来这部分是用来链接新的chain到上面的out chain后面:
- for (ln = in; ln; ln = ln->next) {
- cl = ngx_alloc_chain_link(r->pool);
- if (cl == NULL) {
- return NGX_ERROR;
- }
- cl->buf = ln->buf;
- *ll = cl;
- ll = &cl->next;
- #if 1
- if (ngx_buf_size(cl->buf) == 0 && !ngx_buf_special(cl->buf)) {
- ngx_debug_point();
- return NGX_ERROR;
- }
- #endif
- size += ngx_buf_size(cl->buf);
- if (cl->buf->flush || cl->buf->recycled) {
- flush = 1;
- }
- if (cl->buf->last_buf) {
- last = 1;
- }
- }
for (ln = in; ln; ln = ln->next) {
//
cl = ngx_alloc_chain_link(r->pool);
if (cl == NULL) {
return NGX_ERROR;
}
cl->buf = ln->buf;
//前面的代码我们知道ll已经指向out chain的最后一个位置了,因此这里就是将新的chain链接到out chain的后面。
*ll = cl;
ll = &cl->next;
#if 1
//校验buf
if (ngx_buf_size(cl->buf) == 0 && !ngx_buf_special(cl->buf)) {
ngx_debug_point();
return NGX_ERROR;
}
#endif
//计算大小
size += ngx_buf_size(cl->buf);
//判断是否需要flush
if (cl->buf->flush || cl->buf->recycled) {
flush = 1;
}
//判断是否是最后一个buf
if (cl->buf->last_buf) {
last = 1;
}
}
然后接下来的这段代码主要是对进行发送前buf的一些标记的处理。
在看代码之前先来解释下几个比较重要的标记。
第一个是ngx_http_core_module的conf的一个标记postpone_output(conf里面可以配置的),这个表示延迟输出的阀,也就是说将要发送的字节数如果小于这个的话,并且还有另外几个条件的话(下面会解释),就会直接返回不发送当前的chain。
第二个是c->write->delayed,这个表示当前的连接的写必须要被delay了,也就是说现在不能发送了(原因下面会解释),得等另外的地方取消了delayed才能发送,此时我们修改连接的buffered的标记,然后返回NGX_AGAIN.
第三个是c->buffered,因为有时buf并没有发完,因此我们有时就会设置buffed标记,而我们可能会在多个filter模块中被buffered,因此下面就是buffered的类型。
- #define NGX_HTTP_LOWLEVEL_BUFFERED 0xf0
- #define NGX_HTTP_WRITE_BUFFERED 0x10
- #define NGX_LOWLEVEL_BUFFERED 0x0f
- #define NGX_HTTP_GZIP_BUFFERED 0x20
- #define NGX_HTTP_SSI_BUFFERED 0x01
- #define NGX_HTTP_SUB_BUFFERED 0x02
- #define NGX_HTTP_COPY_BUFFERED 0x04
//这个并没有用到
#define NGX_HTTP_LOWLEVEL_BUFFERED 0xf0
//主要是这个,这个表示在最终的write filter中被buffered
#define NGX_HTTP_WRITE_BUFFERED 0x10
//判断是否有被设置
#define NGX_LOWLEVEL_BUFFERED 0x0f
//下面几个filter中被buffered
#define NGX_HTTP_GZIP_BUFFERED 0x20
#define NGX_HTTP_SSI_BUFFERED 0x01
#define NGX_HTTP_SUB_BUFFERED 0x02
#define NGX_HTTP_COPY_BUFFERED 0x04
然后我们来看第二个的意思,这个表示当前的chain已经被buffered了,
第四个是r->limit_rate,这个表示当前的request的发送限制速率,这个也是在nginx.conf中配置的,而一般就是通过这个值来设置c->write->delayed的。也就是说如果发送速率大于这个limit了的话,就设置delayed,然后这边的request就会延迟发送,下面我们的代码会看到nginx如何处理。
- clcf = ngx_http_get_module_loc_conf(r, ngx_http_core_module);
- if (!last && !flush && in && size < (off_t) clcf->postpone_output) {
- return NGX_OK;
- }
- if (c->write->delayed) {
- c->buffered |= NGX_HTTP_WRITE_BUFFERED;
- return NGX_AGAIN;
- }
- if (size == 0 && !(c->buffered & NGX_LOWLEVEL_BUFFERED)) {
- if (last) {
- r->out = NULL;
- c->buffered &= ~NGX_HTTP_WRITE_BUFFERED;
- return NGX_OK;
- }
- if (flush) {
- do {
- r->out = r->out->next;
- } while (r->out);
- c->buffered &= ~NGX_HTTP_WRITE_BUFFERED;
- return NGX_OK;
- }
- ngx_log_error(NGX_LOG_ALERT, c->log, 0,
- "the http output chain is empty");
- ngx_debug_point();
- return NGX_ERROR;
- }
- if (r->limit_rate) {
- limit = r->limit_rate * (ngx_time() - r->start_sec + 1)
- - (c->sent - clcf->limit_rate_after);
- if (limit <= 0) {
- c->write->delayed = 1;
- ngx_add_timer(c->write,
- (ngx_msec_t) (- limit * 1000 / r->limit_rate + 1));
- c->buffered |= NGX_HTTP_WRITE_BUFFERED;
- return NGX_AGAIN;
- }
- } else if (clcf->sendfile_max_chunk) {
- limit = clcf->sendfile_max_chunk;
- } else {
- limit = 0;
- }
- sent = c->sent;
clcf = ngx_http_get_module_loc_conf(r, ngx_http_core_module);
//也就是说将要发送的字节数小于postpone_output并且不是最后一个buf,并且不需要刷新chain的话,就直接返回。
if (!last && !flush && in && size < (off_t) clcf->postpone_output) {
return NGX_OK;
}
///如果设置了write的delayed,则设置标记。
if (c->write->delayed) {
c->buffered |= NGX_HTTP_WRITE_BUFFERED;
return NGX_AGAIN;
}
//如果size为0,并且没有设置buffered标记,则进入清理工作。
if (size == 0 && !(c->buffered & NGX_LOWLEVEL_BUFFERED)) {
//如果是最后一个buf,则清理buffered标记然后清理out chain
if (last) {
r->out = NULL;
c->buffered &= ~NGX_HTTP_WRITE_BUFFERED;
return NGX_OK;
}
//如果有设置flush的话,则会强行传输当前buf之前的所有buf,因此这里就需要清理out chain。
if (flush) {
do {
r->out = r->out->next;
} while (r->out);
//清理buf 标记
c->buffered &= ~NGX_HTTP_WRITE_BUFFERED;
return NGX_OK;
}
ngx_log_error(NGX_LOG_ALERT, c->log, 0,
"the http output chain is empty");
ngx_debug_point();
return NGX_ERROR;
}
//如果有发送速率限制。
if (r->limit_rate) {
//计算是否有超过速率限制
limit = r->limit_rate * (ngx_time() - r->start_sec + 1)
- (c->sent - clcf->limit_rate_after);
//如果有
if (limit <= 0) {
//设置delayed标记
c->write->delayed = 1;
//设置定时器
ngx_add_timer(c->write,
(ngx_msec_t) (- limit * 1000 / r->limit_rate + 1));
//设置buffered。
c->buffered |= NGX_HTTP_WRITE_BUFFERED;
return NGX_AGAIN;
}
} else if (clcf->sendfile_max_chunk) {
//sendfile所用到的limit。
limit = clcf->sendfile_max_chunk;
} else {
limit = 0;
}
sent = c->sent;
然后接下来这段就是发送buf,以及发送完的处理部分。这里要注意send_chain返回值为还没有发送完的chain,这个函数我后面的blog会详细的分析的。
- chain = c->send_chain(c, r->out, limit);
- ngx_log_debug1(NGX_LOG_DEBUG_HTTP, c->log, 0,
- "http write filter %p", chain);
- if (chain == NGX_CHAIN_ERROR) {
- c->error = 1;
- return NGX_ERROR;
- }
- if (r->limit_rate) {
- nsent = c->sent;
- if (clcf->limit_rate_after) {
- sent -= clcf->limit_rate_after;
- if (sent < 0) {
- sent = 0;
- }
- nsent -= clcf->limit_rate_after;
- if (nsent < 0) {
- nsent = 0;
- }
- }
- delay = (ngx_msec_t) ((nsent - sent) * 1000 / r->limit_rate + 1);
- if (delay > 0) {
- c->write->delayed = 1;
- ngx_add_timer(c->write, delay);
- }
- } else if (c->write->ready
- && clcf->sendfile_max_chunk
- && (size_t) (c->sent - sent)
- >= clcf->sendfile_max_chunk - 2 * ngx_pagesize)
- {
- c->write->delayed = 1;
- ngx_add_timer(c->write, 1);
- }
- for (cl = r->out; cl && cl != chain; ) {
- ln = cl;
- cl = cl->next;
- ngx_free_chain(r->pool, ln);
- }
- r->out = chain;
- if (chain) {
- c->buffered |= NGX_HTTP_WRITE_BUFFERED;
- return NGX_AGAIN;
- }
- c->buffered &= ~NGX_HTTP_WRITE_BUFFERED;
- if ((c->buffered & NGX_LOWLEVEL_BUFFERED) && r->postponed == NULL) {
- return NGX_AGAIN;
- }
- return NGX_OK;
//调用发送函数。
chain = c->send_chain(c, r->out, limit);
ngx_log_debug1(NGX_LOG_DEBUG_HTTP, c->log, 0,
"http write filter %p", chain);
if (chain == NGX_CHAIN_ERROR) {
c->error = 1;
return NGX_ERROR;
}
//控制imit_rate,这个值一般是在nginx.conf中配置的。
if (r->limit_rate) {
nsent = c->sent;
if (clcf->limit_rate_after) {
sent -= clcf->limit_rate_after;
if (sent < 0) {
sent = 0;
}
nsent -= clcf->limit_rate_after;
if (nsent < 0) {
nsent = 0;
}
}
delay = (ngx_msec_t) ((nsent - sent) * 1000 / r->limit_rate + 1);
if (delay > 0) {
c->write->delayed = 1;
ngx_add_timer(c->write, delay);
}
} else if (c->write->ready
&& clcf->sendfile_max_chunk
&& (size_t) (c->sent - sent)
>= clcf->sendfile_max_chunk - 2 * ngx_pagesize)
{
c->write->delayed = 1;
ngx_add_timer(c->write, 1);
}
//开始遍历上一次还没有传输完毕的chain,如果这次没有传完的里面还有的话,就跳出循环,否则free这个chain
for (cl = r->out; cl && cl != chain; /* void */) {
ln = cl;
cl = cl->next;
ngx_free_chain(r->pool, ln);
}
///out chain赋值
r->out = chain;
//如果chain存在,则设置buffered并且返回again。
if (chain) {
c->buffered |= NGX_HTTP_WRITE_BUFFERED;
return NGX_AGAIN;
}
//否则清理buffered
c->buffered &= ~NGX_HTTP_WRITE_BUFFERED;
//如果有其他的filter buffered并且postponed被设置了,则我们返回again,也就是还有buf要处理。
if ((c->buffered & NGX_LOWLEVEL_BUFFERED) && r->postponed == NULL) {
return NGX_AGAIN;
}
//否则返回ok
return NGX_OK;
然后我们来看ngx_http_header_filter_module模块,这个模块的处理函数是ngx_http_header_filter。这个函数最终还是会调用ngx_http_write_filter来将head输出。
这个函数主要就是处理http的头域,然后设置对应的reponse值,最终输出。
这里header filter比较简单,这里没有什么复杂的东西,主要就是设置一些status。然后拷贝,最后通过ngx_http_write_filter进行发送。