Chinaunix首页 | 论坛 | 博客
  • 博客访问: 750065
  • 博文数量: 116
  • 博客积分: 923
  • 博客等级: 准尉
  • 技术积分: 1635
  • 用 户 组: 普通用户
  • 注册时间: 2011-10-06 21:43
个人简介

一直帮老板搬运代码!!!

文章分类
文章存档

2013年(47)

2012年(69)

分类: LINUX

2013-01-30 22:15:42

我把nginx处理分为,非配置upstream处理和配置upstream处理两种,至于使用upstream处理,将在下一篇文章叙述;

下面我来说nginx request处理:


先说多进程:

我的多进程调试方法很简单

第一步:在ngx_event_process_init(ngx_cycle_t *cycle) 方法的下面添加对应“我的添加”

  if (ccf->master && ccf->worker_processes > 1 && ecf->accept_mutex) {
        ngx_use_accept_mutex = 1;
        ngx_accept_mutex_held = 0;
        ngx_accept_mutex_delay = ecf->accept_mutex_delay; //延迟的秒数500

    } else {
        ngx_use_accept_mutex = 0;
    }

    //我的修改
    ngx_use_accept_mutex = 1;
    ngx_accept_mutex_held = 0;
    ngx_accept_mutex_delay = ecf->accept_mutex_delay; //延迟的秒数500

第二步,去掉其他设置多进程的步骤,让其能运行,并且,把多进程改成多线程

main方法的这里注释

//    //对所有指定的信号进行初始化
//    if (ngx_init_signals(cycle->log) != NGX_OK) {
//        return 1;
//    }

    if (!ngx_inherited && ccf->daemon) {
//        if (ngx_daemon(cycle->log) != NGX_OK) {//守护进程的处理
//            return 1;
//        }

//        ngx_daemonized = 1;
    }


//下面这里可以改成1或2,方便调试

ngx_start_worker_processes(ngx_cycle_t *cycle, ngx_int_t n, ngx_int_t type)

 for (i = 0; i < 1; i++) { //生成n-8个worker process进程

//        cpu_affinity = ngx_get_cpu_affinity(i); //获得CPu亲和性
        // ngx_spawn_process 创建 worker 子进程并初始化相关资源和属性,
        // 然后执行子进程的执行函数 ngx_worker_process_cycle
        ngx_spawn_process(cycle, ngx_worker_process_cycle, NULL,
                          "worker process", type); //产生worker进程

ngx_spawn_process 这里改成线程  if (fcntl(ngx_processes[s].channel[0], F_SETOWN, ngx_pid) == -1) { //设置将接收SIGIO和SIGURG信号的进程id或进程组ID,进程组id通过提供负值的arg来说明,否则,arg将被认为所进程id
//            //SIGURG网络传来带外数据时产生,也是紧急套接口,SIGIO异步id(设置异步后必须设置),异步io是SIGIO和SIGURG组合
//            ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno,
//                          "fcntl(F_SETOWN) failed while spawning \"%s\"", name);
//            ngx_close_channel(ngx_processes[s].channel, cycle->log);
//            return NGX_INVALID_PID;
//        }
// // 执行了 exec 后关闭管道句柄
//        if (fcntl(ngx_processes[s].channel[0], F_SETFD, FD_CLOEXEC) == -1) { //设置FD_CLOEXEC标志,在子进程里面没有关闭,在EXEC里面关闭,不拷贝进入
//            ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno,
//                          "fcntl(FD_CLOEXEC) failed while spawning \"%s\"",
//                           name);
//            ngx_close_channel(ngx_processes[s].channel, cycle->log);
//            return NGX_INVALID_PID;
//        }

//        if (fcntl(ngx_processes[s].channel[1], F_SETFD, FD_CLOEXEC) == -1) {
//            ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno,
//                          "fcntl(FD_CLOEXEC) failed while spawning \"%s\"",
//                           name);
//            ngx_close_channel(ngx_processes[s].channel, cycle->log);
//            return NGX_INVALID_PID;
//        }
        // 设臵当前子进程的管道句柄
//        ngx_channel = ngx_processes[s].channel[1]; //给写的管道

    } else {
        ngx_processes[s].channel[0] = -1;
        ngx_processes[s].channel[1] = -1;
    }
    // 设臵当前子进程的进程表项索引
    ngx_process_slot = s; //第几个位置

    // 创建子进程
//    pid = fork();//创建一个进程
//
//    switch (pid) {
//
//    case -1:
//        ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno,
//                      "fork() failed while spawning \"%s\"", name);
//        ngx_close_channel(ngx_processes[s].channel, cycle->log);
//        return NGX_INVALID_PID;
//
//    case 0:
    // 创建子进程
//        ngx_pid = ngx_getpid();
    // 子进程运行执行函数
//        proc(cycle, data); //会创建deamon进程,把这屏蔽掉才能走下一步然后子进程的回调是ngx_cache_manager_process_cycle这个函数,这个函数比较简单,就是设置定时器:
//        break;
//
//    default:
//        break;
//    }

    //建立子线程

    thread_pass[ngx_process_slot] = ngx_alloc(sizeof(ngx_thread_pass), cycle->log);

    thread_pass[ngx_process_slot]->ngx_channel = ngx_processes[s].channel[1];
    thread_pass[ngx_process_slot]->proc = proc;
    thread_pass[ngx_process_slot]->data = data;
    thread_pass[ngx_process_slot]->cycle = cycle;

    pthread_create(&pid , NULL , start_rtn, thread_pass[ngx_process_slot]);
    usleep(1000);


void *start_rtn(void * arg)
{
    ngx_thread_pass * tp = arg;
    WriteLog("Log",1,"start_rtn 启动参数-->线程数目:%ld - %ld \n\t",tp->ngx_channel,pthread_self());
    tp->proc(tp->cycle, tp->data,tp->ngx_channel); //会创建deamon进程,把这屏蔽掉才能走下一步然后子进程的回调是ngx_cache_manager_process_cycle这个函数,这个函数比较简单,就是设置定时器:
    WriteLog("Log",1,"end\n\t");
    pthread_exit(0);
}

这样就可以调试了!!!!


nginx处理从这里开始:

ngx_epoll_process_events方法的,events = epoll_wait(ep, event_list, (int) nevents, timer);开始。

如果是多进程,开始的时候,每个进程都会走一次events = epoll_wait(ep, event_list, (int) nevents, timer); 返回的是父子通信的管道id和读取管道的信息。


第二次就在events = epoll_wait(ep, event_list, (int) nevents, timer);永久等待用户的请求链接。


请求第一次循环:

用户请求到来,epoll_wait方法返回,根据初始化设置回调,多进程在这里设置ngx_process_events_and_timers(ngx_cycle_t *cycle) ,单进程在ngx_single_process_cycle这里的 for (i = 0; ngx_modules[i]; i++) {
        if (ngx_modules[i]->init_process) {
            if (ngx_modules[i]->init_process(cycle) == NGX_ERROR) {
                /* fatal */
                exit(2);
            }
        }
    }

设置请求(rev->handler)为:ngx_event_accept ,在这个函数里面设置请求并且添加接收计时器:ngx_add_timer(rev, c->listening->post_accept_timeout);)

并且通过: queue = (ngx_event_t **) (rev->accept ?&ngx_posted_accept_events : &ngx_posted_events);
                ngx_locked_post_event(rev, queue); 把请求放到ngx_posted_accept_events队列当中,目的解决单进程并发问题(和多进程原理一样)


然后在这里处理: if (ngx_posted_accept_events) {
        ngx_event_process_posted(cycle, &ngx_posted_accept_events); //处理传递
    }


请求第二次循环

  queue = (ngx_event_t **) (rev->accept ?&ngx_posted_accept_events : &ngx_posted_events);
                ngx_locked_post_event(rev, queue);

//事件放到ngx_posted_events里面
这里处理: if (ngx_posted_events) {
        if (ngx_threaded) {
            ngx_wakeup_worker_thread(cycle);

        } else {
            ngx_event_process_posted(cycle, &ngx_posted_events);//事件的处理post event。
        }
    } //调用这个方法ngx_event_process_posted(cycle, &ngx_posted_events);


方法里面大概做了那些事情呢:

ngx_delete_posted_event(ev) 删除事件并且处理事件:ngx_http_init_request然后

建立和获取ngx_http_connection_t


建立 r = ngx_pcalloc(c->pool, sizeof(ngx_http_request_t));否则ngx_memzero(r, sizeof(ngx_http_request_t));


 c->requests++; //请求数增加
rev->handler = ngx_http_process_request_line; //设置处理回调
r->read_event_handler = ngx_http_block_reading;

创建if (c->buffer == NULL) {
        c->buffer = ngx_create_temp_buf(c->pool,
                                        cscf->client_header_buffer_size);
创建 r->pool = ngx_create_pool(cscf->request_pool_size, c->log);

创建配置 r->ctx = ngx_pcalloc(r->pool, sizeof(void *) * ngx_http_max_module);

创建变量r->variables = ngx_pcalloc(r->pool, cmcf->variables.nelts
                                        * sizeof(ngx_http_variable_value_t));

1、处理请求行ngx_http_process_request_line

循环处理
ngx_http_read_request_header-》ngx_http_parse_request_line
 rev->handler = ngx_http_process_request_headers;
            ngx_http_process_request_headers(rev);

初始化: if (ngx_list_init(&r->headers_in.headers, r->pool, 20,
                              sizeof(ngx_table_elt_t))

 if (ngx_array_init(&r->headers_in.cookies, r->pool, 2,
                               sizeof(ngx_table_elt_t *))

2、处理请求头并且解析
ngx_http_process_request_headers 就是解析把数据保存到里面( if (hh && hh->handler(r, h, hh->offset) != NGX_OK) {)
循环处理
 n = ngx_http_read_request_header(r);//首先调用ngx_http_read_request_header来读取头部
rc = ngx_http_parse_header_line(r, r->header_in,
                                        cscf->underscores_in_headers);
 h = ngx_list_push(&r->headers_in.headers);

ngx_http_process_host


3、上面完成后,给客户端发送请求(数据)

rc = ngx_http_process_request_header(r);
 ngx_http_process_request(r);

//删除时间
if (c->read->timer_set) {
        ngx_del_timer(c->read);
    }

改变读写状态
 r->stat_reading = 0;
    (void) ngx_atomic_fetch_add(ngx_stat_writing, 1);
    r->stat_writing = 1;

 c->read->handler = ngx_http_request_handler;
    c->write->handler = ngx_http_request_handler;
    r->read_event_handler = ngx_http_block_reading;

//处理请求
 ngx_http_handler(r);

//写出
 r->write_event_handler = ngx_http_core_run_phases;
    ngx_http_core_run_phases(r);
//处理解析
 ngx_http_run_posted_requests(c);

到这里,数据已经完全发送给客户端了


4、收尾工作

r->keepalive = 1;

//修改链接的时间

ngx_add_timer(rev, clcf->keepalive_timeout);

ngx_http_finalize_connection(r);

单进程在 ngx_http_finalize_connection(r); 里面的ngx_http_set_keepalive(r); 设置

导致ev (ngx_event_process_posted(ngx_cycle_t *cycle,
    ngx_thread_volatile ngx_event_t **posted))走第一次(这个里面已经重用connect了,是不是多余啊)


单进程和多进程的区别://单进程不走post而已,其他一样


            if (flags & NGX_POST_EVENTS) {

               //多进程走这里
                queue = (ngx_event_t **) (rev->accept ?&ngx_posted_accept_events : &ngx_posted_events);
                ngx_locked_post_event(rev, queue); //外加  ngx_event_process_posted(cycle, &ngx_posted_events);

            } else {

                //单进程走这里
                rev->handler(rev); //处理接收
            }


四、超时(红黑数拿到最小时间来定时)

时间到了

if (events == 0) {
        if (timer != NGX_TIMER_INFINITE) {
            return NGX_OK;
        } //这里退出

让上层循环处理超时

if (delta) {
        ngx_event_expire_timers(); //里面把connetion冲epoll里面删除和关闭客户端链接,调用ngx_http_keepalive_handler完成
    }

总结:

1、accept的时候 添加定时器 ngx_add_timer(rev, c->listening->post_accept_timeout);)和初始化connect的工作

2、接收和处理,初始化给类请求和链接和缓冲区,接收解析请求行和请求头后,删除接收的时间控制ngx_del_timer(c->read);

3、发送给客户端数据,数据发送完后,改变状态(r->keepalive = 1;),添加套接口活动时间:ngx_add_timer(rev, clcf->keepalive_timeout);和收尾工作ngx_http_finalize_connection(r);

4、时间到,调用ngx_event_expire_timers(); 处理ngx_http_keepalive_handler关闭链接和重用套接口


精简:如果不使用http协议即可在ngx_http_init_request 添加接收和处理用户信息,然后ngx_add_time 即可


附加:使用upstream的方式

wev->handler(wev);这个是后端服务器的操作
接受:
第一
 rev->handler(rev); -》ngx_event_accept(ngx_event_t *ev)

处理:
第二
rev->handler(rev); -》rev->handler = 0x439183
第二次循环 有两个
 wev->handler》wev->handler = 0x43c4b5 //第一次循环
 rev->handler(rev); ->ngx_http_upstream_handler(发送数据)//第二次循环

超时:
(直接返回,调用 ngx_event_expire_timers();)




阅读(2566) | 评论(0) | 转发(0) |
给主人留下些什么吧!~~