Chinaunix首页 | 论坛 | 博客
  • 博客访问: 127721
  • 博文数量: 69
  • 博客积分: 2895
  • 博客等级: 少校
  • 技术积分: 710
  • 用 户 组: 普通用户
  • 注册时间: 2010-09-03 18:05
文章分类

全部博文(69)

文章存档

2010年(69)

我的朋友

分类:

2010-09-09 13:29:02

集群里老是出现这种情况,不管是调用fs -put,还是在中要往DFS上写block数 据,又或是某些其他的进程调用了DFS的接口来往HDFS写,常常会在某些特定的情况下,进入死循环,如果是调用hadoop fs -put的话,就会一直出不来,并且不停的报类似如下的错:
10/01/20 14:27:45 INFO hdfs.DFSClient: Exception in createBlockOutputStream java.net.SocketTimeoutException: 69000 millis timeout while waiting for channel to be ready for read. ch : java.nio.channels.SocketChannel[connected local=/hostname201:41498 remote=/hostname115:50010]10/01/20 14:27:45 INFO hdfs.DFSClient: Abandoning block blk_4211065766480621095_20388281710/01/20 14:27:45 INFO hdfs.DFSClient: Waiting to find target node: hostname105:50010

而如果是在task中调用的话,就会造成这个task永远也无法正常退出,而如果在 jobtracker中调用,就会造成jobtracker陷入死循环,不停的报类似以上的错误,并无法继续正常。

究其原因,发现不管是哪种情况,都是 因为调用了的接口来尝 试用pipeline的方式对数据进行写入,而由于DFSClient中的代码的, 导致这种现象的产生。仔细的研究DFSClient的源码,问题就暴露出来了:

      当DFSClient想要向HDFS写入数据时,会先联系namenode,从nn出会返回一个datanode的列表(nodes),这个列表根据客户 端配置中的副本数返回相应个数的datanode列表。得到这个列 表后,DFSClient就会尝试往这几个datanode上写入block数据。但是,当写入过程中,该datanode出现问题,或者由于这个 datanode非常繁忙,导致它上的DataXiever线程数达到配置上限,此时DFSClient端就会认为写入数据出错。DFSClient中有 一个私有变量errorIndex,用来记录是nodes列表中到底第几个datanode的数据写入出错,当发生以上情况时(例如第二个 datanode写入出错),此时将errorIndex设置为1(第二个),然后一旦发现errorIndex的值不是-1,就表示出错,此时 DFSClient中会另外的线程就会发现,于是将除去坏掉的datanode的另外的几个datanode放入一个新的nodes列表中 (newnodes),但是,此时没有将nodes替换成新的newnodes列表,errorIndex也没有重新设置为-1,于是就造成了,在 DFSClient中的写数据线程DataStreamer的下一次循环再次进入到时候,发现errorIndex不是-1,nodes列表仍然是以前的 老的nodes列表,于是就导致刚才的运行行为不停的重复,导致log中不断的记录相同的,但DFSClient就是无法返回成功的信息。

      所以,的办法很简单,当发现errorIndex为非-1的时 候,在将nodes列表中出问题的datanode去除的列表拷贝如newnodes中后,将nodes用newnodes列表替换,并重新将 errorIndex设置成-1,就可以了。
   
     patch如下:
@@ -2469,21 +2468,30 @@
               // The original bad datanode is left in the list because it is
               // conservative to remove only one datanode in one iteration.
               for (int j = 0; j < nodes.length; j++) {
-                if (nodes[j] ==  primaryNode) {
+                if (nodes[j].equals(primaryNode)) {
                   errorIndex = j; // forget original bad node.
                 }
               }
+              // remove primary node from list
+              newnodes =  new DatanodeInfo[nodes.length-1];
+              System.arraycopy(nodes, 0, newnodes, 0, errorIndex);
+              System.arraycopy(nodes, errorIndex+1, newnodes, errorIndex,
+                               newnodes.length-errorIndex);
+              nodes = newnodes;
               LOG.warn("Error Recovery for block " + block + " failed " +
                        " because recovery from primary datanode " +
                        primaryNode + " failed " + recoveryErrorCount +
-                       " times. Marking primary datanode as bad.");
+                       " times. " + " Pipeline was " + pipelineMsg +
+                       ". Marking primary datanode as bad.");
               recoveryErrorCount = 0;
+              errorIndex = -1;
               return true;          // sleep when we return from here
             }
             String emsg = "Error Recovery for block " + block + " failed " +
                           " because recovery from primary datanode " +
                           primaryNode + " failed " + recoveryErrorCount +
-                          " times. Aborting...";
+                          " times. "  + " Pipeline was " + pipelineMsg +
+                          ". Aborting...";
             LOG.warn(emsg);
             lastException = new IOException(emsg);
             closed = true;
@@ -2493,7 +2501,8 @@
           LOG.warn("Error Recovery for block " + block + " failed " +
                    " because recovery from primary datanode " +
                    primaryNode + " failed " + recoveryErrorCount +
-                   " times. Will retry...");
+                   " times. "  + " Pipeline was " + pipelineMsg +
+                   ". Will retry...");
           return true;          // sleep when we return from here
         } finally {
           RPC.stopProxy(primary);
阅读(832) | 评论(0) | 转发(0) |
给主人留下些什么吧!~~