公司内部协议均是固定包长的二进制协议,对于内部服务器通信来说足够了,但接口服务器还是采用了http协议,毕竟通用,况且私有二进制协议对外非常不好友,更何况还易遭防火墙拦截;写一个通用且配置功能强大的http server是比较困难的。项目组写的http框架非常难用,仅仅达到能用而已,效率低下,不灵活等等;
在接触了nginx后,被其能扩展的特性深深吸引了,于是尝试为项目组的框架写一个能一个扩展模块,需求蛮明确的:就是将http协议转成服务器内部的二进制协议;
在网上找资料,资料比较稀少,大多是一个简单的hello world例子,比较少参考性;《Emiller的Nginx模块开发心得.pdf》相对而言是一个完善的文档;但看了之后还是感觉一头雾水,不甚明了;最好的文档就是代码,下载了 nginx-1.0.8 源码;source insight 建项目,看代码,析流程;渐渐nginx流程在脑海中明晰起来;
看代码熟悉nginx花3天时间;着手写代码到代码完成1天半,测试休bug到完成目标需求花费1天,为了写这个扩展,把整个周末都搭进去了,晚上还熬了下夜,最后看着内部服务器的数据通过扩展模块中介到nginx输出,还是有点小成就感的;
废话少说,直接上代码:
注:因代码中夹杂了些公司项目的业务,这些代码在protocal文件夹下,被我从压缩包中剔除了,但绝对不影响代码整个流程完整性;
nginx 只支持c代码,扩展模块中加入了不少c++代码,也懒得去搞其他方法了,直接修改了 auto/make 文件,改动如下:
- CPP = g++
-
LINK = \$(CPP) ##采用g++来链接
- ##line=338 below was changed by kevin_zhong on 2011-11-14
-
-
ngx_obj=`echo $ngx_obj \
-
| sed -e "s#^\(.*\.\)cpp\\$#$ngx_objs_dir\1$ngx_objext#g" \
-
-e "s#^\(.*\.\)cc\\$#$ngx_objs_dir\1$ngx_objext#g" \
-
-e "s#^\(.*\.\)c\\$#$ngx_objs_dir\1$ngx_objext#g" \
-
-e "s#^\(.*\.\)S\\$#$ngx_objs_dir\1$ngx_objext#g"`
-
-
ngx_post_suffix=`echo $ngx_src \
-
| sed -e "s#^.*\(\.c\)\\$#\1#g" \
-
-e "s#^.*\(\.cc\)\\$#\1#g" \
-
-e "s#^.*\(\.cpp\)\\$#\1#g"`
-
-
if [ "$ngx_post_suffix"x = ".cpp"x ];then
-
ngx_cc="\$(CPP) $ngx_compile_opt \$(CFLAGS) $ngx_use_pch \$(ALL_INCS) $ADDON_INCS"
-
else
-
ngx_cc="\$(CC) $ngx_compile_opt \$(CFLAGS) $ngx_use_pch \$(ALL_INCS) $ADDON_INCS"
-
fi
上面的脚本是判断源代码后缀,如果是c++则生成makefile语句采用g++,否则采用gcc;
下面是具体代码分析:
- /*
-
* Copyright (C) Igor Sysoev; kevin_zhong
-
* mail: qq2000zhong@gmail.com
-
* date: 2011-11-13
-
*/
-
-
//因是cpp文件,固包含c头文件需要 extern c
-
extern "C" {
-
#include <ngx_config.h>
-
#include <ngx_core.h>
-
#include <ngx_http.h>
-
#include "ngx_chain_util.h"
-
}
-
-
//与服务器内部通信二进制协议实现
-
#include "ngx_thrift_transport.h"
-
#include "ngx_xdrive_datagram.h"
-
#include "protocal/rc_updator_types.h"
-
-
using namespace xdrive::msg::rc_updator;
-
using namespace xdrive;
-
-
/*
-
* 扩展模块需要3个业务相关输入变量,uid,path,recusive
-
* 参考nginx.conf中的配置写法
-
*/
-
-
typedef struct
-
{
-
ngx_http_upstream_conf_t upstream;
-
-
//将uid和path以及recusive在配置中的index找出来,以后create request的时候需要
-
ngx_int_t uid_index;
-
ngx_int_t path_index;
-
ngx_int_t recusive_index;
-
}
-
ngx_http_xdrive_rc_loc_conf_t;
-
-
/*
-
* 注明下,这个模块和网上诸多模块以及nginx特有模块差别最大的地方是:
-
*
-
* 1, 因为项目组的二进制协议不是流式协议,即必须将数据包全部收完整后,
-
* 才能调用decode解码,所以不能像其他模块那样采用流,即不能一边接
-
* 受数据,一边发送数据;只能先将数据全部缓存起来,等到收集到完整的resp包,
-
* 再一次性解码,然后再转换成 json 类格式一次性输出,这是这类协议最大最明显的缺点;
-
*
-
* 2,虽然从后端server收到的resp content length是确定的,但经过转换(从二进制到json类)
-
* 后,content len 已经变得不相等,且很不好计算;所以只能采用 chunk 方式返回给client
-
*
-
* 3,网上有的,或者<Emiller的Nginx模块开发心得.pdf>中有的,都不提,参考即可;
-
*/
-
-
typedef struct
-
{
-
ngx_http_request_t *request;
-
ngx_chain_pair_t body_buff;
-
ngx_chain_t * tail_buff;
-
uint64_t uid;
-
ngx_str_t path;
-
bool recusive;
-
-
//后端剩余接受包体长度,即还有多少个字节等待从后端读取,本来不需要这个length的
-
//开始代码是存储 r.out_headers.content_len_n,u->length = r.out_headers.content_len_n
-
//upstream通过u->length==0判断后端数据是否接受完毕,但这样client回复包将得到一个不正确
-
//的 content len,导致接受http包体数据异常...
-
//参考 ngx_http_upstream.c:2391
-
int rest_length;
-
}
-
ngx_http_xdrive_rc_ctx_t;
-
-
-
static ngx_int_t ngx_http_xdrive_rc_add_variables(ngx_conf_t *cf);
-
static ngx_int_t ngx_http_xdrive_rc_create_request(ngx_http_request_t *r);
-
static ngx_int_t ngx_http_xdrive_rc_reinit_request(ngx_http_request_t *r);
-
static ngx_int_t ngx_http_xdrive_rc_process_header(ngx_http_request_t *r);
-
static ngx_int_t ngx_http_xdrive_rc_filter_init(void *data);
-
static ngx_int_t ngx_http_xdrive_rc_filter(void *data, ssize_t bytes);
-
static void ngx_http_xdrive_rc_abort_request(ngx_http_request_t *r);
-
static void ngx_http_xdrive_rc_finalize_request(ngx_http_request_t *r, ngx_int_t rc);
-
-
static void *ngx_http_xdrive_rc_create_loc_conf(ngx_conf_t *cf);
-
static char *ngx_http_xdrive_rc_merge_loc_conf(ngx_conf_t *cf, void *parent, void *child);
-
-
static char *ngx_http_xdrive_rc_pass(ngx_conf_t *cf, ngx_command_t *cmd, void *conf);
-
-
-
static ngx_conf_bitmask_t ngx_http_xdrive_rc_next_upstream_masks[] = {
-
{ ngx_string("error"), NGX_HTTP_UPSTREAM_FT_ERROR },
-
{ ngx_string("timeout"), NGX_HTTP_UPSTREAM_FT_TIMEOUT },
-
{ ngx_string("invalid_header"), NGX_HTTP_UPSTREAM_FT_INVALID_HEADER },
-
{ ngx_string("not_found"), NGX_HTTP_UPSTREAM_FT_HTTP_404 },
-
{ ngx_string("off"), NGX_HTTP_UPSTREAM_FT_OFF },
-
{ ngx_null_string, 0 }
-
};
-
-
/*
-
* 参数设置,不可变,注意和变量的区别
-
*/
-
static ngx_command_t ngx_http_xdrive_rc_commands[] = {
-
{ ngx_string("xdrive_rc_pass"),
-
NGX_HTTP_LOC_CONF | NGX_HTTP_LIF_CONF | NGX_CONF_TAKE1,
-
ngx_http_xdrive_rc_pass,
-
NGX_HTTP_LOC_CONF_OFFSET,
-
0,
-
NULL },
-
-
{ ngx_string("xdrive_rc_connect_timeout"),
-
NGX_HTTP_MAIN_CONF | NGX_HTTP_SRV_CONF | NGX_HTTP_LOC_CONF | NGX_CONF_TAKE1,
-
ngx_conf_set_msec_slot,
-
NGX_HTTP_LOC_CONF_OFFSET,
-
offsetof(ngx_http_xdrive_rc_loc_conf_t, upstream.connect_timeout),
-
NULL },
-
-
{ ngx_string("xdrive_rc_send_timeout"),
-
NGX_HTTP_MAIN_CONF | NGX_HTTP_SRV_CONF | NGX_HTTP_LOC_CONF | NGX_CONF_TAKE1,
-
ngx_conf_set_msec_slot,
-
NGX_HTTP_LOC_CONF_OFFSET,
-
offsetof(ngx_http_xdrive_rc_loc_conf_t, upstream.send_timeout),
-
NULL },
-
-
{ ngx_string("xdrive_rc_buffer_size"),
-
NGX_HTTP_MAIN_CONF | NGX_HTTP_SRV_CONF | NGX_HTTP_LOC_CONF | NGX_CONF_TAKE1,
-
ngx_conf_set_size_slot,
-
NGX_HTTP_LOC_CONF_OFFSET,
-
offsetof(ngx_http_xdrive_rc_loc_conf_t, upstream.buffer_size),
-
NULL },
-
-
{ ngx_string("xdrive_rc_read_timeout"),
-
NGX_HTTP_MAIN_CONF | NGX_HTTP_SRV_CONF | NGX_HTTP_LOC_CONF | NGX_CONF_TAKE1,
-
ngx_conf_set_msec_slot,
-
NGX_HTTP_LOC_CONF_OFFSET,
-
offsetof(ngx_http_xdrive_rc_loc_conf_t, upstream.read_timeout),
-
NULL },
-
-
{ ngx_string("xdrive_rc_next_upstream"),
-
NGX_HTTP_MAIN_CONF | NGX_HTTP_SRV_CONF | NGX_HTTP_LOC_CONF | NGX_CONF_1MORE,
-
ngx_conf_set_bitmask_slot,
-
NGX_HTTP_LOC_CONF_OFFSET,
-
offsetof(ngx_http_xdrive_rc_loc_conf_t, upstream.next_upstream),
-
&ngx_http_xdrive_rc_next_upstream_masks },
-
-
ngx_null_command
-
};
-
-
-
static ngx_http_module_t ngx_http_xdrive_rc_module_ctx = {
-
ngx_http_xdrive_rc_add_variables, /* preconfiguration */
-
NULL, /* postconfiguration */
-
-
NULL, /* create main configuration */
-
NULL, /* init main configuration */
-
-
NULL, /* create server configuration */
-
NULL, /* merge server configuration */
-
-
ngx_http_xdrive_rc_create_loc_conf, /* create location configration */
-
ngx_http_xdrive_rc_merge_loc_conf /* merge location configration */
-
};
-
-
-
ngx_module_t ngx_http_xdrive_rc_module = {
-
NGX_MODULE_V1,
-
&ngx_http_xdrive_rc_module_ctx, /* module context */
-
ngx_http_xdrive_rc_commands, /* module directives */
-
NGX_HTTP_MODULE, /* module type */
-
NULL, /* init master */
-
NULL, /* init module */
-
NULL, /* init process */
-
NULL, /* init thread */
-
NULL, /* exit thread */
-
NULL, /* exit process */
-
NULL, /* exit master */
-
NGX_MODULE_V1_PADDING
-
};
-
-
//业务相关变量,get_handler = NULL,因为这三个是从conf里面通过
-
//正则匹配得到的,为什么不直接通过 get handler 从http requeset里面获取了
-
//因为这样更灵活,conf可以随时改,比如现在 uid 是从 url 里面获取,但如果
-
//业务需要uid放在 query_string,这时候就只需要改配置即可了
-
//思路来源于 ngx_http_memcached_module.c
-
-
static ngx_http_variable_t ngx_http_proxy_vars[] = {
-
{ ngx_string("uid"), NULL,
-
NULL, 0,
-
NGX_HTTP_VAR_CHANGEABLE | NGX_HTTP_VAR_NOCACHEABLE | NGX_HTTP_VAR_NOHASH,
-
0 },
-
{ ngx_string("path"), NULL,
-
NULL, 0,
-
NGX_HTTP_VAR_CHANGEABLE | NGX_HTTP_VAR_NOCACHEABLE | NGX_HTTP_VAR_NOHASH,
-
0 },
-
{ ngx_string("recusive"), NULL,
-
NULL, 0,
-
NGX_HTTP_VAR_CHANGEABLE | NGX_HTTP_VAR_NOCACHEABLE | NGX_HTTP_VAR_NOHASH,
-
0 },
-
{ ngx_null_string, NULL,NULL,0, 0, 0 }
-
};
-
-
-
static ngx_int_t
-
ngx_http_xdrive_rc_handler(ngx_http_request_t *r)
-
{
-
ngx_int_t rc;
-
ngx_http_upstream_t *u;
-
ngx_http_xdrive_rc_ctx_t *ctx;
-
ngx_http_xdrive_rc_loc_conf_t *mlcf;
-
-
if (!(r->method & (NGX_HTTP_GET | NGX_HTTP_HEAD)))
-
{
-
return NGX_HTTP_NOT_ALLOWED;
-
}
-
-
//get 请求,不需要包体
-
rc = ngx_http_discard_request_body(r);
-
-
if (rc != NGX_OK)
-
{
-
return rc;
-
}
-
-
if (ngx_http_set_content_type(r) != NGX_OK)
-
{
-
return NGX_HTTP_INTERNAL_SERVER_ERROR;
-
}
-
-
if (ngx_http_upstream_create(r) != NGX_OK)
-
{
-
return NGX_HTTP_INTERNAL_SERVER_ERROR;
-
}
-
-
u = r->upstream;
-
-
ngx_str_set(&u->schema, "xdrive_rc://");//schema,没发现有什么用,打log貌似会有点用
-
-
u->output.tag = (ngx_buf_tag_t)&ngx_http_xdrive_rc_module;
-
-
mlcf = (ngx_http_xdrive_rc_loc_conf_t *)ngx_http_get_module_loc_conf(r, ngx_http_xdrive_rc_module);
-
-
u->conf = &mlcf->upstream;
-
-
//设置回调,网上大都只讲这里
-
u->create_request = ngx_http_xdrive_rc_create_request;
-
u->reinit_request = ngx_http_xdrive_rc_reinit_request;
-
u->process_header = ngx_http_xdrive_rc_process_header;
-
u->abort_request = ngx_http_xdrive_rc_abort_request;
-
u->finalize_request = ngx_http_xdrive_rc_finalize_request;
-
-
//分配context内存
-
ctx = (ngx_http_xdrive_rc_ctx_t *)ngx_palloc(r->pool, sizeof(ngx_http_xdrive_rc_ctx_t));
-
if (ctx == NULL)
-
{
-
return NGX_HTTP_INTERNAL_SERVER_ERROR;
-
}
-
ngx_memzero(ctx, sizeof(ngx_http_xdrive_rc_ctx_t));
-
-
ctx->request = r;
-
-
ngx_http_set_ctx(r, ctx, ngx_http_xdrive_rc_module);
-
-
u->input_filter_init = ngx_http_xdrive_rc_filter_init;
-
-
/*
-
* 非常关键的设置,后端服务器包体数据到达的时候,upstream 会回调 input_filter,默认的
-
* input_filter 是 ngx_http_upstream_non_buffered_filter(ngx_http_upstream.c:2475),默认
-
* filter 就是收到数据立马发送给client;而因为需求必须将包体缓存起来,所以这里替换成了我们
-
* 的回调函数,函数里面的功能就是: 缓存包体,等待包体接受完毕,解码,然后一次回复给client
-
*/
-
u->input_filter = ngx_http_xdrive_rc_filter;
-
u->input_filter_ctx = ctx;
-
-
u->buffering = 0; //note, no buffering...cause too complicated !!
-
-
r->main->count++;
-
-
//不需要包体,直接初始化 upstream 即可,若需要接受包体,只需要
-
//调用ngx_http_read_client_request_body(r, ngx_http_upstream_init);
-
ngx_http_upstream_init(r);
-
-
return NGX_DONE;
-
}
-
-
-
static ngx_int_t
-
ngx_http_xdrive_rc_create_request(ngx_http_request_t *r)
-
{
-
size_t len;
-
ngx_buf_t *b;
-
ngx_chain_t *cl;
-
ngx_http_xdrive_rc_ctx_t *ctx;
-
ngx_http_variable_value_t *vv;
-
ngx_http_xdrive_rc_loc_conf_t *mlcf;
-
-
mlcf = (ngx_http_xdrive_rc_loc_conf_t *)ngx_http_get_module_loc_conf(r, ngx_http_xdrive_rc_module);
-
-
//根据配置文件uid index号从变量中获取uid的变量值
-
vv = ngx_http_get_indexed_variable(r, mlcf->uid_index);
-
-
if (vv == NULL || vv->not_found || vv->len == 0)
-
{
-
ngx_log_error(NGX_LOG_ERR, r->connection->log, 0,
-
"the \"$uid\" variable is not set");
-
return NGX_ERROR;
-
}
-
-
ctx = (ngx_http_xdrive_rc_ctx_t *)ngx_http_get_module_ctx(r, ngx_http_xdrive_rc_module);
-
ctx->uid = ngx_atoof(vv->data, vv->len);
-
if (ctx->uid == (off_t)NGX_ERROR)
-
{
-
ngx_log_error(NGX_LOG_ERR, r->connection->log, 0,
-
"the \"$uid\" variable is err %s set", vv->data);
-
return NGX_ERROR;
-
}
-
-
//根据配置文件path index号从变量中获取path的变量值
-
vv = ngx_http_get_indexed_variable(r, mlcf->path_index);
-
if (vv == NULL || vv->not_found || vv->len == 0)
-
{
-
ngx_str_set(&ctx->path, "/");
-
}
-
else {
-
ctx->path.data = vv->data;
-
ctx->path.len = vv->len;
-
}
-
-
vv = ngx_http_get_indexed_variable(r, mlcf->recusive_index);
-
if (vv == NULL || vv->not_found || vv->len == 0)
-
{
-
ctx->recusive = false;
-
}
-
else {
-
ctx->recusive = ngx_atoi(vv->data, vv->len);
-
}
-
-
RcUpdateReq list_req;
-
list_req._user_id = ctx->uid;
-
list_req._path.assign((char *)ctx->path.data, (char *)ctx->path.data + ctx->path.len);
-
list_req._recursive = ctx->recusive;
-
-
static uint32_t seq = ngx_time();
-
-
//编码,注意这里的变量使用的内存是从pool里面获取的,成功后,会将buf chain返回;
-
//细节见具体代码,不表
-
cl = ngx_datagram_encode(r->pool, r->connection->log, mlcf->upstream.buffer_size,
-
&list_req, ++seq, 0xC01);
-
if (cl == NULL)
-
return NGX_ERROR;
-
-
//准备发送
-
r->upstream->request_bufs = cl;
-
-
ngx_log_debug1(NGX_LOG_DEBUG_HTTP, r->connection->log, 0,
-
"http xdrive_rc request uid=\"%d\", path=\"%V\", recur=%d",
-
ctx->uid, &ctx->path, ctx->recusive);
-
-
return NGX_OK;
-
}
-
-
-
static ngx_int_t
-
ngx_http_xdrive_rc_reinit_request(ngx_http_request_t *r)
-
{
-
return NGX_OK;
-
}
-
-
/*
-
* 读取二进制包体头部
-
*/
-
static ngx_int_t
-
ngx_http_xdrive_rc_process_header(ngx_http_request_t *r)
-
{
-
ngx_http_upstream_t *u;
-
ngx_http_xdrive_rc_ctx_t *ctx;
-
-
u = r->upstream;
-
-
//因包头固定长度,所以很好判断
-
if (u->buffer.last - u->buffer.pos < NGX_XDRIVE_DATAGRAM_HEADER)
-
return NGX_AGAIN;
-
-
ctx = (ngx_http_xdrive_rc_ctx_t *)ngx_http_get_module_ctx(r, ngx_http_xdrive_rc_module);
-
-
ngx_xdrive_datagram_header_t header;
-
//解包头,获取最重要参数 : 包体长度,根据包体长度收包
-
if (ngx_decode_header(u->buffer.pos, NGX_XDRIVE_DATAGRAM_HEADER,
-
&header, r->connection->log) != NGX_OK)
-
{
-
return NGX_HTTP_UPSTREAM_INVALID_HEADER;
-
}
-
-
//业务代码
-
if (header._type != 0x08C01)
-
{
-
ngx_log_error(NGX_LOG_WARN, r->connection->log, 0,
-
"xdrive_rc ret type not legal = %d", header._type);
-
-
return NGX_HTTP_UPSTREAM_INVALID_HEADER;
-
}
-
-
//业务代码
-
if (header._status != 0)
-
{
-
ngx_log_error(NGX_LOG_WARN, r->connection->log, 0,
-
"xdrive_rc ret status not ok in response = %d", header._status);
-
-
return NGX_HTTP_UPSTREAM_INVALID_HEADER;
-
}
-
-
//非常关键一句,这句意思是返回client包包体长度不定,必须采用chunk filter;
-
ngx_http_clear_content_length(r);
-
-
//因upstream不知道该从upstream收取多少包体数据(我们故意没设置包体长度)
-
//所以我们必须自己处理记录剩余包体长度;
-
ctx->rest_length = header._length - NGX_XDRIVE_DATAGRAM_HEADER;
-
-
u->headers_in.status_n = NGX_HTTP_OK;
-
u->state->status = NGX_HTTP_OK;
-
-
//包头数据已经处理完毕,必须丢弃;
-
u->buffer.pos += NGX_XDRIVE_DATAGRAM_HEADER;
-
-
return NGX_OK;
-
}
-
-
-
//其实没啥用
-
static ngx_int_t
-
ngx_http_xdrive_rc_filter_init(void *data)
-
{
-
ngx_http_xdrive_rc_ctx_t *ctx = (ngx_http_xdrive_rc_ctx_t *)data;
-
-
ngx_http_upstream_t *u;
-
-
u = ctx->request->upstream;
-
-
return NGX_OK;
-
}
-
-
/*
-
* 缓存包体,等待包体接受完毕,解码,然后一次回复给client
-
*/
-
static ngx_int_t
-
ngx_http_xdrive_rc_filter(void *data, ssize_t bytes)
-
{
-
ngx_http_xdrive_rc_ctx_t *ctx = (ngx_http_xdrive_rc_ctx_t *)data;
-
-
u_char *last;
-
ngx_buf_t *b;
-
ngx_chain_t *cl, **ll;
-
ngx_http_upstream_t *u;
-
-
ngx_http_xdrive_rc_loc_conf_t *mlcf;
-
-
mlcf = (ngx_http_xdrive_rc_loc_conf_t *)
-
ngx_http_get_module_loc_conf(ctx->request, ngx_http_xdrive_rc_module);
-
-
u = ctx->request->upstream;
-
b = &u->buffer;
-
-
size_t buff_size = mlcf->upstream.buffer_size;
-
//assert(bytes <= buff_size);
-
-
ctx->rest_length -= bytes;
-
-
ngx_log_debug1(NGX_LOG_DEBUG_HTTP, ctx->request->connection->log, 0,
-
"recv resp len=%d, rest-len=%d", bytes, ctx->rest_length);
-
-
//特殊情况下,如果包体数据很短(和缓冲区长度比),很可能一次就将包体收完了,这时候
-
//直接交互内存即可,不再需要内存拷贝,否则...
-
if (ctx->rest_length == 0 && ctx->body_buff._chain_head == NULL)
-
{
-
cl = ngx_chain_get_free_buf(ctx->request->pool, &u->free_bufs);
-
ctx->body_buff._chain_head = cl;
-
-
cl->buf->flush = 1;
-
cl->buf->memory = 1;
-
-
last = b->last;
-
cl->buf->pos = last;
-
b->last += bytes;
-
cl->buf->last = b->last;
-
cl->buf->tag = u->output.tag;
-
}
-
else {
-
//做一次内存拷贝到 body buf 中去
-
if (ngx_chain_write(ctx->request->pool, &u->free_bufs, &ctx->body_buff, buff_size,
-
b->last, bytes) != NGX_OK)
-
return NGX_ERROR;
-
-
b->last += bytes;
-
}
-
-
//判断upstream包体是否收完整
-
if (ctx->rest_length > 0)
-
{
-
return NGX_OK;
-
}
-
-
//包体收完,进行解码
-
RcUpdateResp list_resp;
-
if (ngx_datagram_decode_body(ctx->body_buff._chain_head,
-
ctx->request->connection->log,
-
&list_resp) != NGX_OK)
-
{
-
ngx_log_error(NGX_LOG_ERR, ctx->request->connection->log, 0,
-
"xdrive_rc RcUpdateResp decode failed");
-
-
return NGX_ERROR;
-
}
-
-
ngx_log_error(NGX_LOG_NOTICE, ctx->request->connection->log, 0,
-
"xdrive_rc RcUpdateResp list num=%d",
-
list_resp._action_list.size());
-
-
//内容已经存入 list_resp 中,body buf失去作用,回收到free bufs里面去,刚好下面用
-
ngx_chain_t *busy_bufs = NULL;
-
ngx_chain_update_chains(&u->free_bufs, &busy_bufs, &ctx->body_buff._chain_head, b->tag);
-
-
//transfer...
-
ngx_chain_pair_t chain_pair;
-
ngx_memzero(&chain_pair, sizeof(chain_pair));
-
-
//转成 json 格式
-
if (NGX_OK != ngx_chain_sprintf(ctx->request->pool, &u->free_bufs, &chain_pair, buff_size,
-
"uid=%d, path=%V, recusive=%d, week_dcid=\"%s\", used_space=%d, list_num=%d\n",
-
ctx->uid, &ctx->path, ctx->recusive,
-
list_resp._weak_dcid.c_str(),
-
list_resp._used_space,
-
list_resp._action_list.size()
-
))
-
return NGX_ERROR;
-
-
//转成 json 格式
-
for (size_t i = 0; i < list_resp._action_list.size(); ++i)
-
{
-
ActionThrft *ac = &list_resp._action_list[i];
-
if (NGX_OK != ngx_chain_sprintf(ctx->request->pool, &u->free_bufs, &chain_pair, buff_size,
-
"[path=\"%s\", node_type=%d, status=%d, gcid=%s, size=%d]\n",
-
ac->m_path.c_str(), ac->m_node_type, ac->m_status,
-
ac->m_gcid.c_str(), ac->m_file_size
-
))
-
return NGX_ERROR;
-
}
-
-
//这句非常有意思,标志这是回包最后一个buf,upstraem通过这标志得知后端收据收集处理完毕
-
//关后端连接,回前端包
-
chain_pair._chain_last->buf->last_buf = 1;
-
-
for (cl = u->out_bufs, ll = &u->out_bufs; cl; cl = cl->next)
-
{
-
ll = &cl->next;
-
}
-
*ll = chain_pair._chain_head;
-
-
return NGX_OK;
-
}
-
-
-
static void
-
ngx_http_xdrive_rc_abort_request(ngx_http_request_t *r)
-
{
-
ngx_log_debug0(NGX_LOG_DEBUG_HTTP, r->connection->log, 0,
-
"abort http xdrive_rc request");
-
return;
-
}
-
-
-
static void
-
ngx_http_xdrive_rc_finalize_request(ngx_http_request_t *r, ngx_int_t rc)
-
{
-
ngx_log_debug0(NGX_LOG_DEBUG_HTTP, r->connection->log, 0,
-
"finalize http xdrive_rc request");
-
return;
-
}
-
-
-
static void *
-
ngx_http_xdrive_rc_create_loc_conf(ngx_conf_t *cf)
-
{
-
ngx_http_xdrive_rc_loc_conf_t *conf;
-
-
conf = (ngx_http_xdrive_rc_loc_conf_t *)ngx_pcalloc(cf->pool,
-
sizeof(ngx_http_xdrive_rc_loc_conf_t));
-
if (conf == NULL)
-
{
-
return NULL;
-
}
-
-
conf->upstream.connect_timeout = NGX_CONF_UNSET_MSEC;
-
conf->upstream.send_timeout = NGX_CONF_UNSET_MSEC;
-
conf->upstream.read_timeout = NGX_CONF_UNSET_MSEC;
-
-
conf->upstream.buffer_size = NGX_CONF_UNSET_SIZE;
-
-
/* the hardcoded values */
-
conf->upstream.cyclic_temp_file = 0;
-
conf->upstream.buffering = 0;
-
conf->upstream.ignore_client_abort = 0;
-
conf->upstream.send_lowat = 0;
-
conf->upstream.bufs.num = 0;
-
conf->upstream.busy_buffers_size = 0;
-
conf->upstream.max_temp_file_size = 0;
-
conf->upstream.temp_file_write_size = 0;
-
conf->upstream.intercept_errors = 1;
-
conf->upstream.intercept_404 = 1;
-
conf->upstream.pass_request_headers = 0;
-
conf->upstream.pass_request_body = 0;
-
-
conf->uid_index = NGX_CONF_UNSET;
-
conf->path_index = NGX_CONF_UNSET;
-
conf->recusive_index = NGX_CONF_UNSET;
-
-
return conf;
-
}
-
-
-
static char *
-
ngx_http_xdrive_rc_merge_loc_conf(ngx_conf_t *cf, void *parent, void *child)
-
{
-
ngx_http_xdrive_rc_loc_conf_t *prev = (ngx_http_xdrive_rc_loc_conf_t *)parent;
-
ngx_http_xdrive_rc_loc_conf_t *conf = (ngx_http_xdrive_rc_loc_conf_t *)child;
-
-
ngx_conf_merge_msec_value(conf->upstream.connect_timeout,
-
prev->upstream.connect_timeout, 60000);
-
-
ngx_conf_merge_msec_value(conf->upstream.send_timeout,
-
prev->upstream.send_timeout, 60000);
-
-
ngx_conf_merge_msec_value(conf->upstream.read_timeout,
-
prev->upstream.read_timeout, 60000);
-
-
ngx_conf_merge_size_value(conf->upstream.buffer_size,
-
prev->upstream.buffer_size,
-
(size_t)ngx_pagesize);
-
-
ngx_conf_merge_bitmask_value(conf->upstream.next_upstream,
-
prev->upstream.next_upstream,
-
(NGX_CONF_BITMASK_SET
-
| NGX_HTTP_UPSTREAM_FT_ERROR
-
| NGX_HTTP_UPSTREAM_FT_TIMEOUT));
-
-
if (conf->upstream.next_upstream & NGX_HTTP_UPSTREAM_FT_OFF)
-
{
-
conf->upstream.next_upstream = NGX_CONF_BITMASK_SET
-
| NGX_HTTP_UPSTREAM_FT_OFF;
-
}
-
-
if (conf->upstream.upstream == NULL)
-
{
-
conf->upstream.upstream = prev->upstream.upstream;
-
}
-
-
if (conf->uid_index == NGX_CONF_UNSET) {
-
conf->uid_index = prev->uid_index;
-
}
-
if (conf->path_index == NGX_CONF_UNSET) {
-
conf->path_index = prev->path_index;
-
}
-
if (conf->recusive_index == NGX_CONF_UNSET) {
-
conf->recusive_index = prev->recusive_index;
-
}
-
-
return NGX_CONF_OK;
-
}
-
-
-
static char *
-
ngx_http_xdrive_rc_pass(ngx_conf_t *cf, ngx_command_t *cmd, void *conf)
-
{
-
ngx_http_xdrive_rc_loc_conf_t *mlcf = (ngx_http_xdrive_rc_loc_conf_t *)conf;
-
-
ngx_str_t *value;
-
ngx_url_t u;
-
ngx_http_core_loc_conf_t *clcf;
-
-
if (mlcf->upstream.upstream)
-
{
-
return "is duplicate";
-
}
-
-
value = (ngx_str_t *)cf->args->elts;
-
-
ngx_memzero(&u, sizeof(ngx_url_t));
-
-
u.url = value[1];
-
u.no_resolve = 1;
-
-
mlcf->upstream.upstream = ngx_http_upstream_add(cf, &u, 0);
-
if (mlcf->upstream.upstream == NULL)
-
{
-
return (char *)(NGX_CONF_ERROR);
-
}
-
-
clcf = (ngx_http_core_loc_conf_t *)ngx_http_conf_get_module_loc_conf(cf, ngx_http_core_module);
-
-
clcf->handler = ngx_http_xdrive_rc_handler;
-
-
if (clcf->name.data[clcf->name.len - 1] == '/')
-
{
-
clcf->auto_redirect = 1;
-
}
-
-
//保存变量index用
-
mlcf->uid_index = ngx_http_get_variable_index(cf, &ngx_http_proxy_vars[0].name);
-
if (mlcf->uid_index == NGX_ERROR)
-
{
-
return (char *)(NGX_CONF_ERROR);
-
}
-
mlcf->path_index = ngx_http_get_variable_index(cf, &ngx_http_proxy_vars[1].name);
-
if (mlcf->path_index == NGX_ERROR)
-
{
-
return (char *)(NGX_CONF_ERROR);
-
}
-
mlcf->recusive_index = ngx_http_get_variable_index(cf, &ngx_http_proxy_vars[2].name);
-
if (mlcf->recusive_index == NGX_ERROR)
-
{
-
return (char *)(NGX_CONF_ERROR);
-
}
-
-
return NGX_CONF_OK;
-
}
-
-
-
-
static ngx_int_t
-
ngx_http_xdrive_rc_add_variables(ngx_conf_t *cf)
-
{
-
ngx_http_variable_t *var, *v;
-
-
for (v = ngx_http_proxy_vars; v->name.len; v++)
-
{
-
var = ngx_http_add_variable(cf, &v->name, v->flags);
-
if (var == NULL)
-
{
-
return NGX_ERROR;
-
}
-
-
var->get_handler = v->get_handler;
-
var->data = v->data;
-
}
-
-
return NGX_OK;
-
}
代码中一些有意思的地方:
- //和buf差不多的思想的 buf chain
- typedef struct
- {
- ngx_chain_t* _chain_head;
- ngx_chain_t* _chain_pos;
- ngx_chain_t* _chain_last;
- ngx_chain_t* _chain_tail;
- }
- ngx_chain_pair_t;
- //从buf chain中读取len长内存出来
- size_t ngx_cdecl
-
ngx_chain_read(ngx_chain_pair_t* chain_pair
-
, uint8_t *buf, uint32_t len);
//将buf写入到buf chain中
-
ngx_int_t ngx_cdecl
-
ngx_chain_write(ngx_pool_t* pool
-
, ngx_chain_t** free_bufs
-
, ngx_chain_pair_t* chain_pair
-
, size_t write_chunk_size
-
, const uint8_t *buf, uint32_t len);
//写json或者xml之类回复有用
-
ngx_int_t ngx_cdecl
-
ngx_chain_sprintf(ngx_pool_t *pool
-
, ngx_chain_t **free_bufs
-
, ngx_chain_pair_t *chain_pair
-
, size_t write_chunk_size
-
, const char *fmt, ...);
下面是nginx配置文件中的关键部分
- location ~* /rc_list/([0-9]+).html$ {
-
xdrive_rc_buffer_size 4096;
-
set $uid $1;
-
set $path /;
-
set $recusive 0;
-
if ($query_string ~* (|&)recusive=(0|1)(|&)) {
-
set $recusive $2;
-
}
-
xdrive_rc_pass 127.0.0.1:11001;
-
}
- 解释下上面配置文件意思,将url中匹配的用户数值放入uid参数,根据后缀参数判断是否递归将值放入
- recusive 参数中;扩展模块将从这三个参数中将需要的值提取出来;
- 思路来源于:ngx_http_memcached_module.c 模块,应该还有其他的各种各样的实现方式,不知道还有没有更简单明了的途径;
阅读(12298) | 评论(1) | 转发(1) |