Chinaunix首页 | 论坛 | 博客
  • 博客访问: 70447
  • 博文数量: 28
  • 博客积分: 96
  • 博客等级: 民兵
  • 技术积分: 151
  • 用 户 组: 普通用户
  • 注册时间: 2011-12-01 23:56
文章分类
文章存档

2012年(13)

2011年(15)

分类:

2011-12-15 02:36:46

作者:gfree.wind@gmail.com
博客:blog.focus-linux.net   linuxfocus.blog.chinaunix.net
 
 
本文的copyleft归gfree.wind@gmail.com所有,使用GPL发布,可以自由拷贝,转载。但转载请保持文档的完整性,注明原作者及原链接,严禁用于任何商业用途。
======================================================================================================

有一段时间没有更新了,开始看《深入理解Linux内核架构》,感觉写得很不错。不过暂时还是在学习阶段,所以没有什么可写的。为了保持自己学习的动力,今天继续学习一下zeromq吧。

上次看到了zmq_init中的reaper->start ();以及其对应的工作线程,今天就继续后面的代码。

  1. zmq::ctx_t::ctx_t (uint32_t io_threads_) :
  2.     tag (0xbadcafe0),
  3.     terminating (false)
  4. {
     。。。。。。
      
  1.     // Create I/O thread objects and launch them.
  2.     /*
  3.     为什么i从2开始呢?这里的i是为slot的索引,在前面的代码中slot[0]为zmq_term对应的mailbox,即&term_     mailbox ,slot[1]为reaper线程对应的mailbox。所以I/O线程的mailbox的索引i只能从2开始。
    
     这里创建并启动了I/O线程,并加入到slots中。
  1.     */
  2.     for (uint32_t i = 2; i != io_threads_ + 2; i++) {
  3.         io_thread_t *io_thread = new (std::nothrow) io_thread_t (this, i);
  4.         alloc_assert (io_thread);
  5.         io_threads.push_back (io_thread);
  6.         slots [i] = io_thread->get_mailbox ();
  7.         io_thread->start ();
  8.     }

     //  In the unused part of the slot array, create a list of empty slots.
     /* 如注释所言,保存空的slot 索引*/
     for (int32_t i = (int32_t) slot_count - 1;
          i >= (int32_t) io_threads_ + 2; i--) {
         empty_slots.push_back (i);
         slots [i] = NULL;
     }

     //  Create the logging infrastructure.
     log_socket = create_socket (ZMQ_PUB);
     zmq_assert (log_socket);
     rc = log_socket->bind ("sys://log");
     zmq_assert (rc == 0);
  1. }

先看一下I/O thread的工作线程:
  1. void zmq::epoll_t::loop ()
  2. {
  3.     epoll_event ev_buf [max_io_events];

  4.     while (!stopping) {

  5.         // Execute any due timers.
  6.         int timeout = (int) execute_timers ();

  7.         // Wait for events.
  8.         int n = epoll_wait (epoll_fd, &ev_buf [0], max_io_events,
  9.             timeout ? timeout : -1);
  10.         if (n == -1 && errno == EINTR)
  11.             continue;
  12.         errno_assert (n != -1);

  13.         for (int i = 0; i < n; i ++) {
  14.             poll_entry_t *pe = ((poll_entry_t*) ev_buf [i].data.ptr);

  15.             if (pe->fd == retired_fd)
  16.                 continue;
  17.             if (ev_buf [i].events & (EPOLLERR | EPOLLHUP))
  18.                 pe->events->in_event ();
  19.             if (pe->fd == retired_fd)
  20.                continue;
  21.             if (ev_buf [i].events & EPOLLOUT)
  22.                 pe->events->out_event ();
  23.             if (pe->fd == retired_fd)
  24.                 continue;
  25.             if (ev_buf [i].events & EPOLLIN)
  26.                 pe->events->in_event ();
  27.         }

  28.         // Destroy retired event sources.
  29.         for (retired_t::iterator it = retired.begin (); it != retired.end ();
  30.               ++it)
  31.             delete *it;
  32.         retired.clear ();
  33.     }
  34. }
这个工作流程和机制和reaper线程的工作函数zmq::select_t::loop很相似。
1. 先执行到期的timer,并计算出合适的timeout;
2. 使用epoll监视事件,并根据事件类型执行不同的操作;
3. 销毁过期的poller对象;

再看一下bind的代码,这个zmq_init基本上就结束了。
  1. int zmq::socket_base_t::bind (const char *addr_)
  2. {
  3.     if (unlikely (ctx_terminated)) {
  4.         errno = ETERM;
  5.         return -1;
  6.     }

  7.     // Parse addr_ string.
  8.     std::string protocol;
  9.     std::string address;
  10.     /*
  11.     按照zeromq的格式即protocol://address解析uri
  12.     得到protocol和address
  13.     */
  14.     int rc = parse_uri (addr_, protocol, address);
  15.     if (rc != 0)
  16.         return -1;
     // 检测是否为zeromq支持的协议类型
  1.     rc = check_protocol (protocol);
  2.     if (rc != 0)
  3.         return -1;

  4.     if (protocol == "inproc" || protocol == "sys") {
  5.         /* 
  6.         为本机的一个进程间的通信,即线程间通信.
  7.         因为使用内存间的直接通信,所以无需使用I/O thread
  8.         */
  9.         endpoint_t endpoint = {this, options};
  10.         return register_endpoint (addr_, endpoint);
  11.     }

  12.     if (protocol == "tcp" || protocol == "ipc") {

  13.         // Choose I/O thread to run the listerner in.
  14.         /* 根据亲和性以及线程的负载load挑选一个合适的I/O线程 */
  15.         io_thread_t *io_thread = choose_io_thread (options.affinity);
  16.         if (!io_thread) {
  17.             errno = EMTHREAD;
  18.             return -1;
  19.         }

  20.         // Create and run the listener.
  21.         /* 根据协议和地址,创建一个listener,并将其bind到一个I/O线程。*/
  22.         zmq_listener_t *listener = new (std::nothrow) zmq_listener_t (
  23.             io_thread, this, options);
  24.         alloc_assert (listener);
  25.         int rc = listener->set_address (protocol.c_str(), address.c_str ());
  26.         if (rc != 0) {
  27.             delete listener;
  28.             return -1;
  29.         }
  30.         launch_child (listener);

  31.         return 0;
  32.     }

  33.     if (protocol == "pgm" || protocol == "epgm") {

  34.         // For convenience's sake, bind can be used interchageable with
  35.         // connect for PGM and EPGM transports.
  36.         return connect (addr_);
  37.     }

  38.     zmq_assert (false);
  39.     return -1;
  40. }

看到bind的实现,总算到了zeromq有点意思的地方了。今天对于bind看得很粗略,下次会仔细的学习一下bind的代码——特别是protocol==tcp和ipc

阅读(1222) | 评论(0) | 转发(0) |
给主人留下些什么吧!~~