Chinaunix首页 | 论坛 | 博客
  • 博客访问: 72845
  • 博文数量: 13
  • 博客积分: 168
  • 博客等级: 入伍新兵
  • 技术积分: 200
  • 用 户 组: 普通用户
  • 注册时间: 2011-01-20 10:09
文章分类
文章存档

2013年(2)

2012年(6)

2011年(5)

我的朋友

分类: HADOOP

2013-01-20 19:13:02

调研总结 by yingru


 先记录server端的机制

  *最初接触RPC,用自己的思路来猜测RPC的实现机制:

  *Server端开启socket监听,listen()à accept()àread()àwrite()àclose()

  *有请求来时开启thread处理请求,原进程继续监听,请求完毕后将结果返回给client端


  *这样设计的缺点:

  *当访问量大时,并发开启大量线程,会造成server端资源瓶颈。

  *每个线程中,read()阻塞,直到有数据进来。 


Hadoop server端机制:使用JAVA NIO机制(new IO) 

 JAVA NIO常用的几个类是 Listener,selector, reader, handler, responder

Hadoop中:


*Listener:监听RPC server的端口,接受连接,用selector筛选server端感兴趣的连接,然后把连接转发到某个Reader,让Reader去读取那个连接的数据。
*Reader:Reader从某个客户端连接中读取数据流,把它转化成调用对象(Call),放到调用队列(call queue)里
*Handler:从调用队列中获取调用信息,然后调用真正的对象,把调用结果放到响应队列(response queue)里
*Responder:它不断地检查响应队列中是否有调用信息,如果有的话,就把调用的结果返回给客户端。


首先Listener创建socketchannel 相当于传统socket方式中的socket fd  (个人理解。。)


      acceptChannel = ServerSocketChannel.open();
      acceptChannel.configureBlocking(false);


      // Bind the server socket to the local host and port
      bind(acceptChannel.socket(), address, backlogLength);

绑定之后创建selector

selector= Selector.open();

以及reader pool  

      readers = new Reader[readThreads];
      readPool = Executors.newFixedThreadPool(readThreads);
      for (int i = 0; i < readThreads; i++) {
        Selector readSelector = Selector.open();
        Reader reader = new Reader(readSelector);
        readers[i] = reader;
        readPool.execute(reader);
      }
每个reader有一个readselector


然后listener 的 selector开始监听:

    while (running) {
        SelectionKey key = null;
        try {
          selector.select();
          Iterator iter = selector.selectedKeys().iterator();
          while (iter.hasNext()) {
            key = iter.next();
            iter.remove();
            try {
              if (key.isValid()) {
                if (key.isAcceptable())
                  doAccept(key);   
              }
            } catch (IOException e) {
            }
            key = null;
          }
        } 

accept之后

将对应的selectionkey 转给Reader

void doAccept(SelectionKey key) throws IOException,  OutOfMemoryError {
      Connection c = null;
      ServerSocketChannel server = (ServerSocketChannel) key.channel();//获取这个key对应的socket句柄 或者说channel
      SocketChannel channel;
      while ((channel = server.accept()) != null) {
        channel.configureBlocking(false);
        channel.socket().setTcpNoDelay(tcpNoDelay);
        Reader reader = getReader();
        try {
          reader.startAdd();
          SelectionKey readKey = reader.registerChannel(channel);
          c = new Connection(readKey, channel, System.currentTimeMillis());
          readKey.attach(c);
          synchronized (connectionList) {
            connectionList.add(numConnections, c);
            numConnections++;
          }
          if (LOG.isDebugEnabled())
            LOG.debug("Server connection from " + c.toString() +
                "; # active connections: " + numConnections +
                "; # queued calls: " + callQueue.size());          
        } finally {
          reader.finishAdd(); 
        }


      }
    }
将连接转给reader


reader是一直在阻塞的

    public void run() {
        LOG.info("Starting SocketReader");
        synchronized (this) {
          while (running) {
            SelectionKey key = null;
            try {
              readSelector.select();
              while (adding) {
                this.wait(1000);
              }              


              Iterator iter = readSelector.selectedKeys().iterator();
              while (iter.hasNext()) {
                key = iter.next();
                iter.remove();
                if (key.isValid()) {
                  if (key.isReadable()) {
                    doRead(key);
                  }
                }
                key = null;
              }
            } catch (InterruptedException e) {
              if (running) {                      // unexpected -- log it
                LOG.info(getName() + " caught: " +
                         StringUtils.stringifyException(e));
              }
            } catch (IOException ex) {
              LOG.error("Error in Reader", ex);
            }
          }
        }
      }


  void doRead(SelectionKey key) throws InterruptedException {
      int count = 0;
      Connection c = (Connection)key.attachment();//获取这个连接
      if (c == null) {
        return;  
      }
      c.setLastContact(System.currentTimeMillis());
      
      try {
        count = c.readAndProcess();//在这个函数中通过channel读内容 调用channelRead到Buffer里面读数据
      } catch (InterruptedException ieo) {
        LOG.info(getName() + ": readAndProcess caught InterruptedException", ieo);
        throw ieo;
      } catch (Exception e) {
        LOG.info(getName() + ": readAndProcess threw exception " + e + ". Count of bytes read: " + count, e);
        count = -1; //so that the (count < 0) block is executed
      }
      if (count < 0) {
        if (LOG.isDebugEnabled())
          LOG.debug(getName() + ": disconnecting client " + 
                    c + ". Number of active connections: "+
                    numConnections);
        closeConnection(c);
        c = null;
      }
      else {
        c.setLastContact(System.currentTimeMillis());
      }
    }   


个人这样理解:

传统的socket过程是这样的:在一个socket 上listen --> accept-->返回一个新的fd,这个fd转给了别的线程,去读、写数据

JAVA NIO 是这样的: 在一个socket上listen --> 用一个selector阻塞 accept -->将一个有channel信息的connection转给reader,可以有很多reader,比如hadoop的reader pool -->每个reader有一个selector,selector负责阻塞等待感兴趣的信息,等到后通过keys获得channel的信息,然后通过channel读内容,将内容转换成call,给call queue    后续处理。

所以client端的数据传输并不是通过server端listen的那个socket 而是和传统的socket一样,是通过listen accept后返回的那个socket。

才看一两天 并不是特别确定这个结论

请大家指正,谢谢



======续========


其实JAVA NIO机制中,也可以在listener的selector部分 读取客户端的信息 也就是除了判断信息是不是acceptable之外 也可以判断channel是否readable、writable,但是这样仍然会引发一个阻塞的问题,就是如果readable的信息迟迟不来,这样仍然会阻塞listen线程  hadoop的优化是 把读取信息也异步化了  在reader里面读  有一个reader pool。


NIO的selector负责的是监听和内容分发

关于selector:

SelectorChannel之间的关联由一个SelectionKey实例表示。(注意:一个信道可以注册多个Selector实例,因此可以有多个关联的SelectionKey实例)。 SelectionKey维护了一个信道上感兴趣的操作类型信息,并将这些信息存放在一个int型的位图中,该int型数据的每一位都有相应的含义。SelectionKey类中的常量定义了信道上可能感兴趣的操作类型,每个这种常量都是只有一位设置为1的位掩码。

它在内部可以同时管理多个I/O,当一个信道有I/O操作的时 候,他会通知Selector,Selector就是记住这个信道有I/O操作,并且知道是何种I/O操作,hadoop中,acceptable的操作在listener的selector中来监听,可读的信道操作在reader pool中每个reader的selector来监听。

这样,对客户端请求的accept、信息的read  就分散到几个线程中了,一个listen线程,几个read线程,这些线程来轮询channel。


关于channel:

一个 Channel实例代表了一个“可轮询的”I/O目标,如套接字(或一个文件、设备等)。

Channel能够注册一个Selector类的实例。 Selector的select()方法允许你询问“在一组信道中,哪一个当前需要服务(即,被接收,读或写)”,这两个类都包含在 java.nio.channels包中。

一个 Selector实例可以同时检查一组信道的I/O状态。


SelectorChannel之间的关联由一个SelectionKey实例表示。(注意:一个信道可以注册多个Selector实例,因此可以有多个关联的SelectionKey实例)。 SelectionKey维护了一个信道上感兴趣的操作类型信息,并将这些信息存放在一个int型的位图中,该int型数据的每一位都有相应的含义。SelectionKey类中的常量定义了信道上可能感兴趣的操作类型,每个这种常量都是只有一位设置为1的位掩码。


上面的内容部分摘自http://vaporz.blog.51cto.com/3142258/587229>


问题:selector和channel是不是多对多的


另外 JAVA NIO的另一个好处是 使用了Buffer 代替 stream  使得性能可控 这个还需要再学习研究 具体没有什么概念。


阅读(4117) | 评论(0) | 转发(2) |
给主人留下些什么吧!~~