Chinaunix首页 | 论坛 | 博客
  • 博客访问: 510715
  • 博文数量: 78
  • 博客积分: 995
  • 博客等级: 准尉
  • 技术积分: 1462
  • 用 户 组: 普通用户
  • 注册时间: 2011-11-15 20:22
个人简介

技术中沉思的时候最快乐,问题得到完美解决的时候最有成就感!

文章分类

全部博文(78)

文章存档

2013年(39)

2012年(37)

2011年(2)

分类: LINUX

2011-11-15 20:31:29

      公司内部协议均是固定包长的二进制协议,对于内部服务器通信来说足够了,但接口服务器还是采用了http协议,毕竟通用,况且私有二进制协议对外非常不好友,更何况还易遭防火墙拦截;写一个通用且配置功能强大的http server是比较困难的。项目组写的http框架非常难用,仅仅达到能用而已,效率低下,不灵活等等;
      在接触了nginx后,被其能扩展的特性深深吸引了,于是尝试为项目组的框架写一个能一个扩展模块,需求蛮明确的:就是将http协议转成服务器内部的二进制协议
      在网上找资料,资料比较稀少,大多是一个简单的hello world例子,比较少参考性;《Emiller的Nginx模块开发心得.pdf》相对而言是一个完善的文档;但看了之后还是感觉一头雾水,不甚明了;最好的文档就是代码,下载了 nginx-1.0.8 源码;source insight 建项目,看代码,析流程;渐渐nginx流程在脑海中明晰起来;
      看代码熟悉nginx花3天时间;着手写代码到代码完成1天半,测试休bug到完成目标需求花费1天,为了写这个扩展,把整个周末都搭进去了,晚上还熬了下夜,最后看着内部服务器的数据通过扩展模块中介到nginx输出,还是有点小成就感的;
      废话少说,直接上代码:
       xdrive.rar   
      注:因代码中夹杂了些公司项目的业务,这些代码在protocal文件夹下,被我从压缩包中剔除了,但绝对不影响代码整个流程完整性;


      nginx 只支持c代码,扩展模块中加入了不少c++代码,也懒得去搞其他方法了,直接修改了 auto/make 文件,改动如下:

  1. CPP = g++
  2. LINK = \$(CPP) ##采用g++来链接
  1. ##line=338 below was changed by kevin_zhong on 2011-11-14

  2.         ngx_obj=`echo $ngx_obj \
  3.             | sed -e "s#^\(.*\.\)cpp\\$#$ngx_objs_dir\1$ngx_objext#g" \
  4.                   -e "s#^\(.*\.\)cc\\$#$ngx_objs_dir\1$ngx_objext#g" \
  5.                   -e "s#^\(.*\.\)c\\$#$ngx_objs_dir\1$ngx_objext#g" \
  6.                   -e "s#^\(.*\.\)S\\$#$ngx_objs_dir\1$ngx_objext#g"`

  7.         ngx_post_suffix=`echo $ngx_src \
  8.             | sed -e "s#^.*\(\.c\)\\$#\1#g" \
  9.                  -e "s#^.*\(\.cc\)\\$#\1#g" \
  10.                  -e "s#^.*\(\.cpp\)\\$#\1#g"`

  11.         if [ "$ngx_post_suffix"x = ".cpp"x ];then
  12.             ngx_cc="\$(CPP) $ngx_compile_opt \$(CFLAGS) $ngx_use_pch \$(ALL_INCS) $ADDON_INCS"
  13.         else
  14.             ngx_cc="\$(CC) $ngx_compile_opt \$(CFLAGS) $ngx_use_pch \$(ALL_INCS) $ADDON_INCS"
  15.         fi
上面的脚本是判断源代码后缀,如果是c++则生成makefile语句采用g++,否则采用gcc;


下面是具体代码分析:
  1. /*
  2.  * Copyright (C) Igor Sysoev; kevin_zhong
  3.  * mail: qq2000zhong@gmail.com
  4.  * date: 2011-11-13
  5.  */

  6. //因是cpp文件,固包含c头文件需要 extern c
  7. extern "C" {
  8. #include <ngx_config.h>
  9. #include <ngx_core.h>
  10. #include <ngx_http.h>
  11. #include "ngx_chain_util.h"
  12. }

  13. //与服务器内部通信二进制协议实现
  14. #include "ngx_thrift_transport.h"
  15. #include "ngx_xdrive_datagram.h"
  16. #include "protocal/rc_updator_types.h"

  17. using namespace xdrive::msg::rc_updator;
  18. using namespace xdrive;

  19. /*
  20. * 扩展模块需要3个业务相关输入变量,uid,path,recusive
  21. * 参考nginx.conf中的配置写法
  22. */

  23. typedef struct
  24. {
  25.         ngx_http_upstream_conf_t upstream;

  26.         //将uid和path以及recusive在配置中的index找出来,以后create request的时候需要
  27.         ngx_int_t uid_index;
  28.         ngx_int_t path_index;
  29.         ngx_int_t recusive_index;
  30. }
  31. ngx_http_xdrive_rc_loc_conf_t;

  32. /*
  33. * 注明下,这个模块和网上诸多模块以及nginx特有模块差别最大的地方是:
  34. *
  35. * 1, 因为项目组的二进制协议不是流式协议,即必须将数据包全部收完整后,
  36. * 才能调用decode解码,所以不能像其他模块那样采用流,即不能一边接
  37. * 受数据,一边发送数据;只能先将数据全部缓存起来,等到收集到完整的resp包,
  38. * 再一次性解码,然后再转换成 json 类格式一次性输出,这是这类协议最大最明显的缺点;
  39. *
  40. * 2,虽然从后端server收到的resp content length是确定的,但经过转换(从二进制到json类)
  41. * 后,content len 已经变得不相等,且很不好计算;所以只能采用 chunk 方式返回给client
  42. *
  43. * 3,网上有的,或者<Emiller的Nginx模块开发心得.pdf>中有的,都不提,参考即可;
  44. */

  45. typedef struct
  46. {
  47.         ngx_http_request_t *request;
  48.         ngx_chain_pair_t body_buff;
  49.         ngx_chain_t * tail_buff;
  50.         uint64_t uid;
  51.         ngx_str_t path;
  52.         bool recusive;

  53.         //后端剩余接受包体长度,即还有多少个字节等待从后端读取,本来不需要这个length的
  54.         //开始代码是存储 r.out_headers.content_len_n,u->length = r.out_headers.content_len_n
  55.         //upstream通过u->length==0判断后端数据是否接受完毕,但这样client回复包将得到一个不正确
  56.         //的 content len,导致接受http包体数据异常...
  57.         //参考 ngx_http_upstream.c:2391
  58.         int rest_length;
  59. }
  60. ngx_http_xdrive_rc_ctx_t;


  61. static ngx_int_t ngx_http_xdrive_rc_add_variables(ngx_conf_t *cf);
  62. static ngx_int_t ngx_http_xdrive_rc_create_request(ngx_http_request_t *r);
  63. static ngx_int_t ngx_http_xdrive_rc_reinit_request(ngx_http_request_t *r);
  64. static ngx_int_t ngx_http_xdrive_rc_process_header(ngx_http_request_t *r);
  65. static ngx_int_t ngx_http_xdrive_rc_filter_init(void *data);
  66. static ngx_int_t ngx_http_xdrive_rc_filter(void *data, ssize_t bytes);
  67. static void ngx_http_xdrive_rc_abort_request(ngx_http_request_t *r);
  68. static void ngx_http_xdrive_rc_finalize_request(ngx_http_request_t *r, ngx_int_t rc);

  69. static void *ngx_http_xdrive_rc_create_loc_conf(ngx_conf_t *cf);
  70. static char *ngx_http_xdrive_rc_merge_loc_conf(ngx_conf_t *cf, void *parent, void *child);

  71. static char *ngx_http_xdrive_rc_pass(ngx_conf_t *cf, ngx_command_t *cmd, void *conf);


  72. static ngx_conf_bitmask_t ngx_http_xdrive_rc_next_upstream_masks[] = {
  73.         { ngx_string("error"), NGX_HTTP_UPSTREAM_FT_ERROR },
  74.         { ngx_string("timeout"), NGX_HTTP_UPSTREAM_FT_TIMEOUT },
  75.         { ngx_string("invalid_header"), NGX_HTTP_UPSTREAM_FT_INVALID_HEADER },
  76.         { ngx_string("not_found"), NGX_HTTP_UPSTREAM_FT_HTTP_404 },
  77.         { ngx_string("off"), NGX_HTTP_UPSTREAM_FT_OFF },
  78.         { ngx_null_string, 0 }
  79. };

  80. /*
  81. * 参数设置,不可变,注意和变量的区别
  82. */
  83. static ngx_command_t ngx_http_xdrive_rc_commands[] = {
  84.         { ngx_string("xdrive_rc_pass"),
  85.           NGX_HTTP_LOC_CONF | NGX_HTTP_LIF_CONF | NGX_CONF_TAKE1,
  86.           ngx_http_xdrive_rc_pass,
  87.           NGX_HTTP_LOC_CONF_OFFSET,
  88.           0,
  89.           NULL },

  90.         { ngx_string("xdrive_rc_connect_timeout"),
  91.           NGX_HTTP_MAIN_CONF | NGX_HTTP_SRV_CONF | NGX_HTTP_LOC_CONF | NGX_CONF_TAKE1,
  92.           ngx_conf_set_msec_slot,
  93.           NGX_HTTP_LOC_CONF_OFFSET,
  94.           offsetof(ngx_http_xdrive_rc_loc_conf_t, upstream.connect_timeout),
  95.           NULL },

  96.         { ngx_string("xdrive_rc_send_timeout"),
  97.           NGX_HTTP_MAIN_CONF | NGX_HTTP_SRV_CONF | NGX_HTTP_LOC_CONF | NGX_CONF_TAKE1,
  98.           ngx_conf_set_msec_slot,
  99.           NGX_HTTP_LOC_CONF_OFFSET,
  100.           offsetof(ngx_http_xdrive_rc_loc_conf_t, upstream.send_timeout),
  101.           NULL },

  102.         { ngx_string("xdrive_rc_buffer_size"),
  103.           NGX_HTTP_MAIN_CONF | NGX_HTTP_SRV_CONF | NGX_HTTP_LOC_CONF | NGX_CONF_TAKE1,
  104.           ngx_conf_set_size_slot,
  105.           NGX_HTTP_LOC_CONF_OFFSET,
  106.           offsetof(ngx_http_xdrive_rc_loc_conf_t, upstream.buffer_size),
  107.           NULL },

  108.         { ngx_string("xdrive_rc_read_timeout"),
  109.           NGX_HTTP_MAIN_CONF | NGX_HTTP_SRV_CONF | NGX_HTTP_LOC_CONF | NGX_CONF_TAKE1,
  110.           ngx_conf_set_msec_slot,
  111.           NGX_HTTP_LOC_CONF_OFFSET,
  112.           offsetof(ngx_http_xdrive_rc_loc_conf_t, upstream.read_timeout),
  113.           NULL },

  114.         { ngx_string("xdrive_rc_next_upstream"),
  115.           NGX_HTTP_MAIN_CONF | NGX_HTTP_SRV_CONF | NGX_HTTP_LOC_CONF | NGX_CONF_1MORE,
  116.           ngx_conf_set_bitmask_slot,
  117.           NGX_HTTP_LOC_CONF_OFFSET,
  118.           offsetof(ngx_http_xdrive_rc_loc_conf_t, upstream.next_upstream),
  119.           &ngx_http_xdrive_rc_next_upstream_masks },

  120.         ngx_null_command
  121. };


  122. static ngx_http_module_t ngx_http_xdrive_rc_module_ctx = {
  123.         ngx_http_xdrive_rc_add_variables, /* preconfiguration */
  124.         NULL, /* postconfiguration */

  125.         NULL, /* create main configuration */
  126.         NULL, /* init main configuration */

  127.         NULL, /* create server configuration */
  128.         NULL, /* merge server configuration */

  129.         ngx_http_xdrive_rc_create_loc_conf, /* create location configration */
  130.         ngx_http_xdrive_rc_merge_loc_conf /* merge location configration */
  131. };


  132. ngx_module_t ngx_http_xdrive_rc_module = {
  133.         NGX_MODULE_V1,
  134.         &ngx_http_xdrive_rc_module_ctx, /* module context */
  135.         ngx_http_xdrive_rc_commands, /* module directives */
  136.         NGX_HTTP_MODULE, /* module type */
  137.         NULL, /* init master */
  138.         NULL, /* init module */
  139.         NULL, /* init process */
  140.         NULL, /* init thread */
  141.         NULL, /* exit thread */
  142.         NULL, /* exit process */
  143.         NULL, /* exit master */
  144.         NGX_MODULE_V1_PADDING
  145. };

  146. //业务相关变量,get_handler = NULL,因为这三个是从conf里面通过
  147. //正则匹配得到的,为什么不直接通过 get handler 从http requeset里面获取了
  148. //因为这样更灵活,conf可以随时改,比如现在 uid 是从 url 里面获取,但如果
  149. //业务需要uid放在 query_string,这时候就只需要改配置即可了
  150. //思路来源于 ngx_http_memcached_module.c

  151. static ngx_http_variable_t ngx_http_proxy_vars[] = {
  152.         { ngx_string("uid"), NULL,
  153.           NULL, 0,
  154.           NGX_HTTP_VAR_CHANGEABLE | NGX_HTTP_VAR_NOCACHEABLE | NGX_HTTP_VAR_NOHASH,
  155.           0 },
  156.         { ngx_string("path"), NULL,
  157.           NULL, 0,
  158.           NGX_HTTP_VAR_CHANGEABLE | NGX_HTTP_VAR_NOCACHEABLE | NGX_HTTP_VAR_NOHASH,
  159.           0 },
  160.         { ngx_string("recusive"), NULL,
  161.           NULL, 0,
  162.           NGX_HTTP_VAR_CHANGEABLE | NGX_HTTP_VAR_NOCACHEABLE | NGX_HTTP_VAR_NOHASH,
  163.           0 },
  164.         { ngx_null_string, NULL,NULL,0, 0, 0 }
  165. };


  166. static ngx_int_t
  167. ngx_http_xdrive_rc_handler(ngx_http_request_t *r)
  168. {
  169.         ngx_int_t rc;
  170.         ngx_http_upstream_t *u;
  171.         ngx_http_xdrive_rc_ctx_t *ctx;
  172.         ngx_http_xdrive_rc_loc_conf_t *mlcf;

  173.         if (!(r->method & (NGX_HTTP_GET | NGX_HTTP_HEAD)))
  174.         {
  175.                 return NGX_HTTP_NOT_ALLOWED;
  176.         }

  177.         //get 请求,不需要包体
  178.         rc = ngx_http_discard_request_body(r);

  179.         if (rc != NGX_OK)
  180.         {
  181.                 return rc;
  182.         }

  183.         if (ngx_http_set_content_type(r) != NGX_OK)
  184.         {
  185.                 return NGX_HTTP_INTERNAL_SERVER_ERROR;
  186.         }

  187.         if (ngx_http_upstream_create(r) != NGX_OK)
  188.         {
  189.                 return NGX_HTTP_INTERNAL_SERVER_ERROR;
  190.         }

  191.         u = r->upstream;

  192.         ngx_str_set(&u->schema, "xdrive_rc://");//schema,没发现有什么用,打log貌似会有点用
  193.         
  194.         u->output.tag = (ngx_buf_tag_t)&ngx_http_xdrive_rc_module;

  195.         mlcf = (ngx_http_xdrive_rc_loc_conf_t *)ngx_http_get_module_loc_conf(r, ngx_http_xdrive_rc_module);

  196.         u->conf = &mlcf->upstream;

  197.         //设置回调,网上大都只讲这里
  198.         u->create_request = ngx_http_xdrive_rc_create_request;
  199.         u->reinit_request = ngx_http_xdrive_rc_reinit_request;
  200.         u->process_header = ngx_http_xdrive_rc_process_header;
  201.         u->abort_request = ngx_http_xdrive_rc_abort_request;
  202.         u->finalize_request = ngx_http_xdrive_rc_finalize_request;

  203.         //分配context内存
  204.         ctx = (ngx_http_xdrive_rc_ctx_t *)ngx_palloc(r->pool, sizeof(ngx_http_xdrive_rc_ctx_t));
  205.         if (ctx == NULL)
  206.         {
  207.                 return NGX_HTTP_INTERNAL_SERVER_ERROR;
  208.         }
  209.         ngx_memzero(ctx, sizeof(ngx_http_xdrive_rc_ctx_t));

  210.         ctx->request = r;

  211.         ngx_http_set_ctx(r, ctx, ngx_http_xdrive_rc_module);

  212.         u->input_filter_init = ngx_http_xdrive_rc_filter_init;

  213.         /*
  214.         * 非常关键的设置,后端服务器包体数据到达的时候,upstream 会回调 input_filter,默认的
  215.         * input_filter 是 ngx_http_upstream_non_buffered_filter(ngx_http_upstream.c:2475),默认
  216.         * filter 就是收到数据立马发送给client;而因为需求必须将包体缓存起来,所以这里替换成了我们
  217.         * 的回调函数,函数里面的功能就是: 缓存包体,等待包体接受完毕,解码,然后一次回复给client
  218.         */
  219.         u->input_filter = ngx_http_xdrive_rc_filter;
  220.         u->input_filter_ctx = ctx;

  221.         u->buffering = 0; //note, no buffering...cause too complicated !!

  222.         r->main->count++;

  223.         //不需要包体,直接初始化 upstream 即可,若需要接受包体,只需要
  224.         //调用ngx_http_read_client_request_body(r, ngx_http_upstream_init);
  225.         ngx_http_upstream_init(r);

  226.         return NGX_DONE;
  227. }


  228. static ngx_int_t
  229. ngx_http_xdrive_rc_create_request(ngx_http_request_t *r)
  230. {
  231.         size_t len;
  232.         ngx_buf_t *b;
  233.         ngx_chain_t *cl;
  234.         ngx_http_xdrive_rc_ctx_t *ctx;
  235.         ngx_http_variable_value_t *vv;
  236.         ngx_http_xdrive_rc_loc_conf_t *mlcf;

  237.         mlcf = (ngx_http_xdrive_rc_loc_conf_t *)ngx_http_get_module_loc_conf(r, ngx_http_xdrive_rc_module);

  238.         //根据配置文件uid index号从变量中获取uid的变量值
  239.         vv = ngx_http_get_indexed_variable(r, mlcf->uid_index);

  240.         if (vv == NULL || vv->not_found || vv->len == 0)
  241.         {
  242.                 ngx_log_error(NGX_LOG_ERR, r->connection->log, 0,
  243.                               "the \"$uid\" variable is not set");
  244.                 return NGX_ERROR;
  245.         }

  246.         ctx = (ngx_http_xdrive_rc_ctx_t *)ngx_http_get_module_ctx(r, ngx_http_xdrive_rc_module);
  247.         ctx->uid = ngx_atoof(vv->data, vv->len);
  248.         if (ctx->uid == (off_t)NGX_ERROR)
  249.         {
  250.                 ngx_log_error(NGX_LOG_ERR, r->connection->log, 0,
  251.                               "the \"$uid\" variable is err %s set", vv->data);
  252.                 return NGX_ERROR;
  253.         }

  254.         //根据配置文件path index号从变量中获取path的变量值
  255.         vv = ngx_http_get_indexed_variable(r, mlcf->path_index);
  256.         if (vv == NULL || vv->not_found || vv->len == 0)
  257.         {
  258.                 ngx_str_set(&ctx->path, "/");
  259.         }
  260.         else {
  261.                 ctx->path.data = vv->data;
  262.                 ctx->path.len = vv->len;
  263.         }

  264.         vv = ngx_http_get_indexed_variable(r, mlcf->recusive_index);
  265.         if (vv == NULL || vv->not_found || vv->len == 0)
  266.         {
  267.                 ctx->recusive = false;
  268.         }
  269.         else {
  270.                 ctx->recusive = ngx_atoi(vv->data, vv->len);
  271.         }

  272.         RcUpdateReq list_req;
  273.         list_req._user_id = ctx->uid;
  274.         list_req._path.assign((char *)ctx->path.data, (char *)ctx->path.data + ctx->path.len);
  275.         list_req._recursive = ctx->recusive;

  276.         static uint32_t seq = ngx_time();

  277.         //编码,注意这里的变量使用的内存是从pool里面获取的,成功后,会将buf chain返回;
  278.         //细节见具体代码,不表
  279.         cl = ngx_datagram_encode(r->pool, r->connection->log, mlcf->upstream.buffer_size,
  280.                                  &list_req, ++seq, 0xC01);
  281.         if (cl == NULL)
  282.                 return NGX_ERROR;

  283.         //准备发送
  284.         r->upstream->request_bufs = cl;

  285.         ngx_log_debug1(NGX_LOG_DEBUG_HTTP, r->connection->log, 0,
  286.                        "http xdrive_rc request uid=\"%d\", path=\"%V\", recur=%d",
  287.                        ctx->uid, &ctx->path, ctx->recusive);

  288.         return NGX_OK;
  289. }


  290. static ngx_int_t
  291. ngx_http_xdrive_rc_reinit_request(ngx_http_request_t *r)
  292. {
  293.         return NGX_OK;
  294. }

  295. /*
  296. * 读取二进制包体头部
  297. */
  298. static ngx_int_t
  299. ngx_http_xdrive_rc_process_header(ngx_http_request_t *r)
  300. {
  301.         ngx_http_upstream_t *u;
  302.         ngx_http_xdrive_rc_ctx_t *ctx;

  303.         u = r->upstream;

  304.         //因包头固定长度,所以很好判断
  305.         if (u->buffer.last - u->buffer.pos < NGX_XDRIVE_DATAGRAM_HEADER)
  306.                 return NGX_AGAIN;

  307.         ctx = (ngx_http_xdrive_rc_ctx_t *)ngx_http_get_module_ctx(r, ngx_http_xdrive_rc_module);

  308.         ngx_xdrive_datagram_header_t header;
  309.         //解包头,获取最重要参数 : 包体长度,根据包体长度收包
  310.         if (ngx_decode_header(u->buffer.pos, NGX_XDRIVE_DATAGRAM_HEADER,
  311.                               &header, r->connection->log) != NGX_OK)
  312.         {
  313.                 return NGX_HTTP_UPSTREAM_INVALID_HEADER;
  314.         }

  315.         //业务代码
  316.         if (header._type != 0x08C01)
  317.         {
  318.                 ngx_log_error(NGX_LOG_WARN, r->connection->log, 0,
  319.                               "xdrive_rc ret type not legal = %d", header._type);

  320.                 return NGX_HTTP_UPSTREAM_INVALID_HEADER;
  321.         }

  322.         //业务代码
  323.         if (header._status != 0)
  324.         {
  325.                 ngx_log_error(NGX_LOG_WARN, r->connection->log, 0,
  326.                               "xdrive_rc ret status not ok in response = %d", header._status);

  327.                 return NGX_HTTP_UPSTREAM_INVALID_HEADER;
  328.         }

  329.         //非常关键一句,这句意思是返回client包包体长度不定,必须采用chunk filter;
  330.         ngx_http_clear_content_length(r);

  331.         //因upstream不知道该从upstream收取多少包体数据(我们故意没设置包体长度)
  332.         //所以我们必须自己处理记录剩余包体长度;
  333.         ctx->rest_length = header._length - NGX_XDRIVE_DATAGRAM_HEADER;

  334.         u->headers_in.status_n = NGX_HTTP_OK;
  335.         u->state->status = NGX_HTTP_OK;

  336.         //包头数据已经处理完毕,必须丢弃;
  337.         u->buffer.pos += NGX_XDRIVE_DATAGRAM_HEADER;

  338.         return NGX_OK;
  339. }


  340. //其实没啥用
  341. static ngx_int_t
  342. ngx_http_xdrive_rc_filter_init(void *data)
  343. {
  344.         ngx_http_xdrive_rc_ctx_t *ctx = (ngx_http_xdrive_rc_ctx_t *)data;

  345.         ngx_http_upstream_t *u;

  346.         u = ctx->request->upstream;

  347.         return NGX_OK;
  348. }

  349. /*
  350. * 缓存包体,等待包体接受完毕,解码,然后一次回复给client
  351. */
  352. static ngx_int_t
  353. ngx_http_xdrive_rc_filter(void *data, ssize_t bytes)
  354. {
  355.         ngx_http_xdrive_rc_ctx_t *ctx = (ngx_http_xdrive_rc_ctx_t *)data;

  356.         u_char *last;
  357.         ngx_buf_t *b;
  358.         ngx_chain_t *cl, **ll;
  359.         ngx_http_upstream_t *u;

  360.         ngx_http_xdrive_rc_loc_conf_t *mlcf;

  361.         mlcf = (ngx_http_xdrive_rc_loc_conf_t *)
  362.                ngx_http_get_module_loc_conf(ctx->request, ngx_http_xdrive_rc_module);

  363.         u = ctx->request->upstream;
  364.         b = &u->buffer;

  365.         size_t buff_size = mlcf->upstream.buffer_size;
  366.         //assert(bytes <= buff_size);

  367.         ctx->rest_length -= bytes;

  368.         ngx_log_debug1(NGX_LOG_DEBUG_HTTP, ctx->request->connection->log, 0,
  369.                        "recv resp len=%d, rest-len=%d", bytes, ctx->rest_length);

  370.         //特殊情况下,如果包体数据很短(和缓冲区长度比),很可能一次就将包体收完了,这时候
  371.         //直接交互内存即可,不再需要内存拷贝,否则...
  372.         if (ctx->rest_length == 0 && ctx->body_buff._chain_head == NULL)
  373.         {
  374.                 cl = ngx_chain_get_free_buf(ctx->request->pool, &u->free_bufs);
  375.                 ctx->body_buff._chain_head = cl;

  376.                 cl->buf->flush = 1;
  377.                 cl->buf->memory = 1;

  378.                 last = b->last;
  379.                 cl->buf->pos = last;
  380.                 b->last += bytes;
  381.                 cl->buf->last = b->last;
  382.                 cl->buf->tag = u->output.tag;
  383.         }
  384.         else {
  385.                 //做一次内存拷贝到 body buf 中去
  386.                 if (ngx_chain_write(ctx->request->pool, &u->free_bufs, &ctx->body_buff, buff_size,
  387.                                     b->last, bytes) != NGX_OK)
  388.                         return NGX_ERROR;

  389.                 b->last += bytes;
  390.         }

  391.         //判断upstream包体是否收完整
  392.         if (ctx->rest_length > 0)
  393.         {
  394.                 return NGX_OK;
  395.         }

  396.         //包体收完,进行解码
  397.         RcUpdateResp list_resp;
  398.         if (ngx_datagram_decode_body(ctx->body_buff._chain_head,
  399.                                      ctx->request->connection->log,
  400.                                      &list_resp) != NGX_OK)
  401.         {
  402.                 ngx_log_error(NGX_LOG_ERR, ctx->request->connection->log, 0,
  403.                               "xdrive_rc RcUpdateResp decode failed");

  404.                 return NGX_ERROR;
  405.         }

  406.         ngx_log_error(NGX_LOG_NOTICE, ctx->request->connection->log, 0,
  407.                       "xdrive_rc RcUpdateResp list num=%d",
  408.                       list_resp._action_list.size());

  409.         //内容已经存入 list_resp 中,body buf失去作用,回收到free bufs里面去,刚好下面用
  410.         ngx_chain_t *busy_bufs = NULL;
  411.         ngx_chain_update_chains(&u->free_bufs, &busy_bufs, &ctx->body_buff._chain_head, b->tag);

  412.         //transfer...
  413.         ngx_chain_pair_t chain_pair;
  414.         ngx_memzero(&chain_pair, sizeof(chain_pair));

  415.         //转成 json 格式
  416.         if (NGX_OK != ngx_chain_sprintf(ctx->request->pool, &u->free_bufs, &chain_pair, buff_size,
  417.                                         "uid=%d, path=%V, recusive=%d, week_dcid=\"%s\", used_space=%d, list_num=%d\n",
  418.                                         ctx->uid, &ctx->path, ctx->recusive,
  419.                                         list_resp._weak_dcid.c_str(),
  420.                                         list_resp._used_space,
  421.                                         list_resp._action_list.size()
  422.                                         ))
  423.                 return NGX_ERROR;

  424.         //转成 json 格式
  425.         for (size_t i = 0; i < list_resp._action_list.size(); ++i)
  426.         {
  427.                 ActionThrft *ac = &list_resp._action_list[i];
  428.                 if (NGX_OK != ngx_chain_sprintf(ctx->request->pool, &u->free_bufs, &chain_pair, buff_size,
  429.                                                 "[path=\"%s\", node_type=%d, status=%d, gcid=%s, size=%d]\n",
  430.                                                 ac->m_path.c_str(), ac->m_node_type, ac->m_status,
  431.                                                 ac->m_gcid.c_str(), ac->m_file_size
  432.                                                 ))
  433.                         return NGX_ERROR;
  434.         }

  435.         //这句非常有意思,标志这是回包最后一个buf,upstraem通过这标志得知后端收据收集处理完毕
  436.         //关后端连接,回前端包
  437.         chain_pair._chain_last->buf->last_buf = 1;

  438.         for (cl = u->out_bufs, ll = &u->out_bufs; cl; cl = cl->next)
  439.         {
  440.                 ll = &cl->next;
  441.         }
  442.         *ll = chain_pair._chain_head;

  443.         return NGX_OK;
  444. }


  445. static void
  446. ngx_http_xdrive_rc_abort_request(ngx_http_request_t *r)
  447. {
  448.         ngx_log_debug0(NGX_LOG_DEBUG_HTTP, r->connection->log, 0,
  449.                        "abort http xdrive_rc request");
  450.         return;
  451. }


  452. static void
  453. ngx_http_xdrive_rc_finalize_request(ngx_http_request_t *r, ngx_int_t rc)
  454. {
  455.         ngx_log_debug0(NGX_LOG_DEBUG_HTTP, r->connection->log, 0,
  456.                        "finalize http xdrive_rc request");
  457.         return;
  458. }


  459. static void *
  460. ngx_http_xdrive_rc_create_loc_conf(ngx_conf_t *cf)
  461. {
  462.         ngx_http_xdrive_rc_loc_conf_t *conf;

  463.         conf = (ngx_http_xdrive_rc_loc_conf_t *)ngx_pcalloc(cf->pool,
  464.                                                             sizeof(ngx_http_xdrive_rc_loc_conf_t));
  465.         if (conf == NULL)
  466.         {
  467.                 return NULL;
  468.         }

  469.         conf->upstream.connect_timeout = NGX_CONF_UNSET_MSEC;
  470.         conf->upstream.send_timeout = NGX_CONF_UNSET_MSEC;
  471.         conf->upstream.read_timeout = NGX_CONF_UNSET_MSEC;

  472.         conf->upstream.buffer_size = NGX_CONF_UNSET_SIZE;

  473.         /* the hardcoded values */
  474.         conf->upstream.cyclic_temp_file = 0;
  475.         conf->upstream.buffering = 0;
  476.         conf->upstream.ignore_client_abort = 0;
  477.         conf->upstream.send_lowat = 0;
  478.         conf->upstream.bufs.num = 0;
  479.         conf->upstream.busy_buffers_size = 0;
  480.         conf->upstream.max_temp_file_size = 0;
  481.         conf->upstream.temp_file_write_size = 0;
  482.         conf->upstream.intercept_errors = 1;
  483.         conf->upstream.intercept_404 = 1;
  484.         conf->upstream.pass_request_headers = 0;
  485.         conf->upstream.pass_request_body = 0;

  486.         conf->uid_index = NGX_CONF_UNSET;
  487.         conf->path_index = NGX_CONF_UNSET;
  488.         conf->recusive_index = NGX_CONF_UNSET;
  489.         
  490.         return conf;
  491. }


  492. static char *
  493. ngx_http_xdrive_rc_merge_loc_conf(ngx_conf_t *cf, void *parent, void *child)
  494. {
  495.         ngx_http_xdrive_rc_loc_conf_t *prev = (ngx_http_xdrive_rc_loc_conf_t *)parent;
  496.         ngx_http_xdrive_rc_loc_conf_t *conf = (ngx_http_xdrive_rc_loc_conf_t *)child;

  497.         ngx_conf_merge_msec_value(conf->upstream.connect_timeout,
  498.                                   prev->upstream.connect_timeout, 60000);

  499.         ngx_conf_merge_msec_value(conf->upstream.send_timeout,
  500.                                   prev->upstream.send_timeout, 60000);

  501.         ngx_conf_merge_msec_value(conf->upstream.read_timeout,
  502.                                   prev->upstream.read_timeout, 60000);

  503.         ngx_conf_merge_size_value(conf->upstream.buffer_size,
  504.                                   prev->upstream.buffer_size,
  505.                                   (size_t)ngx_pagesize);

  506.         ngx_conf_merge_bitmask_value(conf->upstream.next_upstream,
  507.                                      prev->upstream.next_upstream,
  508.                                      (NGX_CONF_BITMASK_SET
  509.                                       | NGX_HTTP_UPSTREAM_FT_ERROR
  510.                                       | NGX_HTTP_UPSTREAM_FT_TIMEOUT));

  511.         if (conf->upstream.next_upstream & NGX_HTTP_UPSTREAM_FT_OFF)
  512.         {
  513.                 conf->upstream.next_upstream = NGX_CONF_BITMASK_SET
  514.                                                | NGX_HTTP_UPSTREAM_FT_OFF;
  515.         }

  516.         if (conf->upstream.upstream == NULL)
  517.         {
  518.                 conf->upstream.upstream = prev->upstream.upstream;
  519.         }

  520.         if (conf->uid_index == NGX_CONF_UNSET) {
  521.                 conf->uid_index = prev->uid_index;
  522.         }
  523.         if (conf->path_index == NGX_CONF_UNSET) {
  524.                 conf->path_index = prev->path_index;
  525.         }
  526.         if (conf->recusive_index == NGX_CONF_UNSET) {
  527.                 conf->recusive_index = prev->recusive_index;
  528.         }

  529.         return NGX_CONF_OK;
  530. }


  531. static char *
  532. ngx_http_xdrive_rc_pass(ngx_conf_t *cf, ngx_command_t *cmd, void *conf)
  533. {
  534.         ngx_http_xdrive_rc_loc_conf_t *mlcf = (ngx_http_xdrive_rc_loc_conf_t *)conf;

  535.         ngx_str_t *value;
  536.         ngx_url_t u;
  537.         ngx_http_core_loc_conf_t *clcf;

  538.         if (mlcf->upstream.upstream)
  539.         {
  540.                 return "is duplicate";
  541.         }

  542.         value = (ngx_str_t *)cf->args->elts;

  543.         ngx_memzero(&u, sizeof(ngx_url_t));

  544.         u.url = value[1];
  545.         u.no_resolve = 1;

  546.         mlcf->upstream.upstream = ngx_http_upstream_add(cf, &u, 0);
  547.         if (mlcf->upstream.upstream == NULL)
  548.         {
  549.                 return (char *)(NGX_CONF_ERROR);
  550.         }

  551.         clcf = (ngx_http_core_loc_conf_t *)ngx_http_conf_get_module_loc_conf(cf, ngx_http_core_module);

  552.         clcf->handler = ngx_http_xdrive_rc_handler;

  553.         if (clcf->name.data[clcf->name.len - 1] == '/')
  554.         {
  555.                 clcf->auto_redirect = 1;
  556.         }

  557.         //保存变量index用
  558.         mlcf->uid_index = ngx_http_get_variable_index(cf, &ngx_http_proxy_vars[0].name);
  559.         if (mlcf->uid_index == NGX_ERROR)
  560.         {
  561.                 return (char *)(NGX_CONF_ERROR);
  562.         }
  563.         mlcf->path_index = ngx_http_get_variable_index(cf, &ngx_http_proxy_vars[1].name);
  564.         if (mlcf->path_index == NGX_ERROR)
  565.         {
  566.                 return (char *)(NGX_CONF_ERROR);
  567.         }
  568.         mlcf->recusive_index = ngx_http_get_variable_index(cf, &ngx_http_proxy_vars[2].name);
  569.         if (mlcf->recusive_index == NGX_ERROR)
  570.         {
  571.                 return (char *)(NGX_CONF_ERROR);
  572.         }

  573.         return NGX_CONF_OK;
  574. }



  575. static ngx_int_t
  576. ngx_http_xdrive_rc_add_variables(ngx_conf_t *cf)
  577. {
  578.         ngx_http_variable_t *var, *v;

  579.         for (v = ngx_http_proxy_vars; v->name.len; v++)
  580.         {
  581.                 var = ngx_http_add_variable(cf, &v->name, v->flags);
  582.                 if (var == NULL)
  583.                 {
  584.                         return NGX_ERROR;
  585.                 }

  586.                 var->get_handler = v->get_handler;
  587.                 var->data = v->data;
  588.         }

  589.         return NGX_OK;
  590. }

代码中一些有意思的地方:
  1. //和buf差不多的思想的 buf chain
  2. typedef  struct
  3. {
  4.         ngx_chain_t* _chain_head;
  5.         ngx_chain_t* _chain_pos;
  6.         ngx_chain_t* _chain_last;
  7.         ngx_chain_t* _chain_tail;
  8. ngx_chain_pair_t;

  9. //从buf chain中读取len长内存出来
  10. size_t ngx_cdecl
  11. ngx_chain_read(ngx_chain_pair_t* chain_pair
  12.                 , uint8_t *buf, uint32_t len);
 //将buf写入到buf chain中
  1. ngx_int_t ngx_cdecl
  2. ngx_chain_write(ngx_pool_t* pool
  3.                 , ngx_chain_t** free_bufs
  4.                 , ngx_chain_pair_t* chain_pair
  5.                 , size_t write_chunk_size
  6.                 , const uint8_t *buf, uint32_t len);
 //写json或者xml之类回复有用
  1. ngx_int_t ngx_cdecl
  2. ngx_chain_sprintf(ngx_pool_t *pool
  3.              , ngx_chain_t **free_bufs
  4.              , ngx_chain_pair_t *chain_pair
  5.              , size_t write_chunk_size
  6.              , const char *fmt, ...);

下面是nginx配置文件中的关键部分
  1. location ~* /rc_list/([0-9]+).html$ {
  2.                 xdrive_rc_buffer_size 4096;
  3.                 set $uid $1;
  4.                 set $path /;
  5.                 set $recusive 0;
  6.                 if ($query_string ~* (|&)recusive=(0|1)(|&)) {
  7.                         set $recusive $2;
  8.                 }
  9.                 xdrive_rc_pass 127.0.0.1:11001;
  10.         }
  1. 解释下上面配置文件意思,将url中匹配的用户数值放入uid参数,根据后缀参数判断是否递归将值放入
  2. recusive 参数中;扩展模块将从这三个参数中将需要的值提取出来;
  3. 思路来源于:ngx_http_memcached_module.c 模块,应该还有其他的各种各样的实现方式,不知道还有没有更简单明了的途径;
阅读(12199) | 评论(1) | 转发(1) |
给主人留下些什么吧!~~

ftpeng2013-05-09 23:15:46

nginx自定义模块开发那个,我根据你的代码做了一些变通。有兴趣我们交流一下