集群里老是出现这种情况,不管是调用fs -put,还是在中要往DFS上写block数 据,又或是某些其他的进程调用了DFS的接口来往HDFS写,常常会在某些特定的情况下,进入死循环,如果是调用hadoop fs -put的话,就会一直出不来,并且不停的报类似如下的错:
10/01/20 14:27:45 INFO hdfs.DFSClient: Exception in createBlockOutputStream java.net.SocketTimeoutException: 69000 millis timeout while waiting for channel to be ready for read. ch : java.nio.channels.SocketChannel[connected local=/hostname201:41498 remote=/hostname115:50010]10/01/20 14:27:45 INFO hdfs.DFSClient: Abandoning block blk_4211065766480621095_20388281710/01/20 14:27:45 INFO hdfs.DFSClient: Waiting to find target node: hostname105:50010
而如果是在task中调用的话,就会造成这个task永远也无法正常退出,而如果在 jobtracker中调用,就会造成jobtracker陷入死循环,不停的报类似以上的错误,并无法继续正常。
究其原因,发现不管是哪种情况,都是 因为调用了的接口来尝 试用pipeline的方式对数据进行写入,而由于DFSClient中的代码的, 导致这种现象的产生。仔细的研究DFSClient的源码,问题就暴露出来了:
当DFSClient想要向HDFS写入数据时,会先联系namenode,从nn出会返回一个datanode的列表(nodes),这个列表根据客户 端配置中的副本数返回相应个数的datanode列表。得到这个列 表后,DFSClient就会尝试往这几个datanode上写入block数据。但是,当写入过程中,该datanode出现问题,或者由于这个 datanode非常繁忙,导致它上的DataXiever线程数达到配置上限,此时DFSClient端就会认为写入数据出错。DFSClient中有 一个私有变量errorIndex,用来记录是nodes列表中到底第几个datanode的数据写入出错,当发生以上情况时(例如第二个 datanode写入出错),此时将errorIndex设置为1(第二个),然后一旦发现errorIndex的值不是-1,就表示出错,此时 DFSClient中会另外的线程就会发现,于是将除去坏掉的datanode的另外的几个datanode放入一个新的nodes列表中 (newnodes),但是,此时没有将nodes替换成新的newnodes列表,errorIndex也没有重新设置为-1,于是就造成了,在 DFSClient中的写数据线程DataStreamer的下一次循环再次进入到时候,发现errorIndex不是-1,nodes列表仍然是以前的 老的nodes列表,于是就导致刚才的运行行为不停的重复,导致log中不断的记录相同的,但DFSClient就是无法返回成功的信息。
所以,的办法很简单,当发现errorIndex为非-1的时 候,在将nodes列表中出问题的datanode去除的列表拷贝如newnodes中后,将nodes用newnodes列表替换,并重新将 errorIndex设置成-1,就可以了。
patch如下:
@@ -2469,21 +2468,30 @@
// The original bad datanode is left in the list because it is
// conservative to remove only one datanode in one iteration.
for (int j = 0; j < nodes.length; j++) {
- if (nodes[j] == primaryNode) {
+ if (nodes[j].equals(primaryNode)) {
errorIndex = j; // forget original bad node.
}
}
+ // remove primary node from list
+ newnodes = new DatanodeInfo[nodes.length-1];
+ System.arraycopy(nodes, 0, newnodes, 0, errorIndex);
+ System.arraycopy(nodes, errorIndex+1, newnodes, errorIndex,
+ newnodes.length-errorIndex);
+ nodes = newnodes;
LOG.warn("Error Recovery for block " + block + " failed " +
" because recovery from primary datanode " +
primaryNode + " failed " + recoveryErrorCount +
- " times. Marking primary datanode as bad.");
+ " times. " + " Pipeline was " + pipelineMsg +
+ ". Marking primary datanode as bad.");
recoveryErrorCount = 0;
+ errorIndex = -1;
return true; // sleep when we return from here
}
String emsg = "Error Recovery for block " + block + " failed " +
" because recovery from primary datanode " +
primaryNode + " failed " + recoveryErrorCount +
- " times. Aborting...";
+ " times. " + " Pipeline was " + pipelineMsg +
+ ". Aborting...";
LOG.warn(emsg);
lastException = new IOException(emsg);
closed = true;
@@ -2493,7 +2501,8 @@
LOG.warn("Error Recovery for block " + block + " failed " +
" because recovery from primary datanode " +
primaryNode + " failed " + recoveryErrorCount +
- " times. Will retry...");
+ " times. " + " Pipeline was " + pipelineMsg +
+ ". Will retry...");
return true; // sleep when we return from here
} finally {
RPC.stopProxy(primary);
阅读(875) | 评论(0) | 转发(0) |