全部博文(247)
分类: HADOOP
2013-07-31 11:35:30
调研总结 by yingru
先记录server端的机制
最初接触RPC,用自己的思路来猜测RPC的实现机制:
Server端开启socket监听,listen()à accept()àread()àwrite()àclose()
有请求来时开启thread处理请求,原进程继续监听,请求完毕后将结果返回给client端
这样设计的缺点:
当访问量大时,并发开启大量线程,会造成server端资源瓶颈。
每个线程中,read()阻塞,直到有数据进来。
Hadoop server端机制:使用JAVA NIO机制(new IO)
JAVA NIO常用的几个类是 Listener,selector, reader, handler, responder
Hadoop中:
acceptChannel = ServerSocketChannel.open();
acceptChannel.configureBlocking(false);
// Bind the server socket to the local host and port
bind(acceptChannel.socket(), address, backlogLength);
绑定之后创建selector
selector= Selector.open();
以及reader pool
readers = new Reader[readThreads];
readPool = Executors.newFixedThreadPool(readThreads);
for (int i = 0; i < readThreads; i++) {
Selector readSelector = Selector.open();
Reader reader = new Reader(readSelector);
readers[i] = reader;
readPool.execute(reader);
}
每个reader有一个readselector
然后listener 的 selector开始监听:
while (running) {
SelectionKey key = null;
try {
selector.select();
Iterator
while (iter.hasNext()) {
key = iter.next();
iter.remove();
try {
if (key.isValid()) {
if (key.isAcceptable())
doAccept(key);
}
} catch (IOException e) {
}
key = null;
}
}
accept之后
将对应的selectionkey 转给Reader
void doAccept(SelectionKey key) throws IOException, OutOfMemoryError {
Connection c = null;
ServerSocketChannel server = (ServerSocketChannel) key.channel();//获取这个key对应的socket句柄 或者说channel
SocketChannel channel;
while ((channel = server.accept()) != null) {
channel.configureBlocking(false);
channel.socket().setTcpNoDelay(tcpNoDelay);
Reader reader = getReader();
try {
reader.startAdd();
SelectionKey readKey = reader.registerChannel(channel);
c = new Connection(readKey, channel, System.currentTimeMillis());
readKey.attach(c);
synchronized (connectionList) {
connectionList.add(numConnections, c);
numConnections++;
}
if (LOG.isDebugEnabled())
LOG.debug("Server connection from " + c.toString() +
"; # active connections: " + numConnections +
"; # queued calls: " + callQueue.size());
} finally {
reader.finishAdd();
}
}
}
将连接转给reader
reader是一直在阻塞的
public void run() {
LOG.info("Starting SocketReader");
synchronized (this) {
while (running) {
SelectionKey key = null;
try {
readSelector.select();
while (adding) {
this.wait(1000);
}
Iterator
while (iter.hasNext()) {
key = iter.next();
iter.remove();
if (key.isValid()) {
if (key.isReadable()) {
doRead(key);
}
}
key = null;
}
} catch (InterruptedException e) {
if (running) { // unexpected -- log it
LOG.info(getName() + " caught: " +
StringUtils.stringifyException(e));
}
} catch (IOException ex) {
LOG.error("Error in Reader", ex);
}
}
}
}
void doRead(SelectionKey key) throws InterruptedException {
int count = 0;
Connection c = (Connection)key.attachment();//获取这个连接
if (c == null) {
return;
}
c.setLastContact(System.currentTimeMillis());
try {
count = c.readAndProcess();//在这个函数中通过channel读内容 调用channelRead到Buffer里面读数据
} catch (InterruptedException ieo) {
LOG.info(getName() + ": readAndProcess caught InterruptedException", ieo);
throw ieo;
} catch (Exception e) {
LOG.info(getName() + ": readAndProcess threw exception " + e + ". Count of bytes read: " + count, e);
count = -1; //so that the (count < 0) block is executed
}
if (count < 0) {
if (LOG.isDebugEnabled())
LOG.debug(getName() + ": disconnecting client " +
c + ". Number of active connections: "+
numConnections);
closeConnection(c);
c = null;
}
else {
c.setLastContact(System.currentTimeMillis());
}
}
个人这样理解:
传统的socket过程是这样的:在一个socket 上listen --> accept-->返回一个新的fd,这个fd转给了别的线程,去读、写数据
JAVA NIO 是这样的: 在一个socket上listen --> 用一个selector阻塞 accept -->将一个有channel信息的connection转给reader,可以有很多reader,比如hadoop的reader pool -->每个reader有一个selector,selector负责阻塞等待感兴趣的信息,等到后通过keys获得channel的信息,然后通过channel读内容,将内容转换成call,给call queue 后续处理。
所以client端的数据传输并不是通过server端listen的那个socket 而是和传统的socket一样,是通过listen accept后返回的那个socket。
才看一两天 并不是特别确定这个结论
请大家指正,谢谢
======续========
其实JAVA NIO机制中,也可以在listener的selector部分 读取客户端的信息 也就是除了判断信息是不是acceptable之外 也可以判断channel是否readable、writable,但是这样仍然会引发一个阻塞的问题,就是如果readable的信息迟迟不来,这样仍然会阻塞listen线程 hadoop的优化是 把读取信息也异步化了 在reader里面读 有一个reader pool。
NIO的selector负责的是监听和内容分发
关于selector:
Selector与Channel之间的关联由一个SelectionKey实例表示。(注意:一个信道可以注册多个Selector实例,因此可以有多个关联的SelectionKey实例)。 SelectionKey维护了一个信道上感兴趣的操作类型信息,并将这些信息存放在一个int型的位图中,该int型数据的每一位都有相应的含义。SelectionKey类中的常量定义了信道上可能感兴趣的操作类型,每个这种常量都是只有一位设置为1的位掩码。
它在内部可以同时管理多个I/O,当一个信道有I/O操作的时 候,他会通知Selector,Selector就是记住这个信道有I/O操作,并且知道是何种I/O操作,hadoop中,acceptable的操作在listener的selector中来监听,可读的信道操作在reader pool中每个reader的selector来监听。
这样,对客户端请求的accept、信息的read 就分散到几个线程中了,一个listen线程,几个read线程,这些线程来轮询channel。
关于channel:
一个 Channel实例代表了一个“可轮询的”I/O目标,如套接字(或一个文件、设备等)。
Channel能够注册一个Selector类的实例。 Selector的select()方法允许你询问“在一组信道中,哪一个当前需要服务(即,被接收,读或写)”,这两个类都包含在 java.nio.channels包中。
一个 Selector实例可以同时检查一组信道的I/O状态。
Selector与Channel之间的关联由一个SelectionKey实例表示。(注意:一个信道可以注册多个Selector实例,因此可以有多个关联的SelectionKey实例)。 SelectionKey维护了一个信道上感兴趣的操作类型信息,并将这些信息存放在一个int型的位图中,该int型数据的每一位都有相应的含义。SelectionKey类中的常量定义了信道上可能感兴趣的操作类型,每个这种常量都是只有一位设置为1的位掩码。
上面的内容部分摘自http://vaporz.blog.51cto.com/3142258/587229>
问题:selector和channel是不是多对多的
另外 JAVA NIO的另一个好处是 使用了Buffer 代替 stream 使得性能可控 这个还需要再学习研究 具体没有什么概念。