Chinaunix首页 | 论坛 | 博客
  • 博客访问: 127723
  • 博文数量: 69
  • 博客积分: 2895
  • 博客等级: 少校
  • 技术积分: 710
  • 用 户 组: 普通用户
  • 注册时间: 2010-09-03 18:05
文章分类

全部博文(69)

文章存档

2010年(69)

我的朋友

分类:

2010-09-09 13:40:11

背景
hadoop当中的每一个datanode上,都会保存 一些HDFS中的blocks,而 这些blocks实际上都是按照一定的格式保存在 datanode这台机器的某些本地目录中的,当通过hadoop向HDFS中保存文件的时候,这些文件就会被hadoop按照blocksize切分成 多个blocks,并按照一定的负载和调度算法和配置文件中的每个block的副本数分配到集群的某些datanode上去。而且hadoop最近的 版本还支持datanode上将HDFS对应的本地目录设置成多个。这个非常有价值,因为通过这个配置,可以在集群的datanode上挂载多个磁盘,每个磁盘挂载在不同的目录下,然后在 hadoop-site.xml中将datanode的dfs.data.dir配置成由逗号分开的多个目录,这 些目录分别对应了多个挂载的磁盘。这样可以在集群的io非常高的时候将io操作分配到各个磁盘上去,减少磁盘的压力和出错的几率。但是,根据对目前 0.19.0的版本中的源代码的研究发现,当多个磁盘中的任何一个crash后,整个datanode就会shutdown它自己,而不管其他的磁盘是不 是 仍然可以工作。这样其实就产生了问题:因为如果仅仅是一个磁盘发生错误就把整个datanode给停掉,那么namenode就势必会在一定的时间间隔后 将这台datanode上保存的所有的blocks转存到其他的datanode上去,以保持blocks的副本数不低于hadoop配置文件中 dfs.replication配置项指定的数目。但是这种情况下其实出现问题的那台datanode上仅仅是一个磁盘上blocks需要被转存,其他好 的磁盘上的blocks是仍然可用的,这样就会增加集群中的负载和集群中所有机器的负载,而且还会造成资源的浪费和集群资源的不合理使用。如果 datanode上有好几十TB的,那么这个过程的代价将会更加严重。而更加合理的处理应该是:即使datanode上有某一个磁盘或者多个磁盘 crash了,只要不是全部crash,datanode应该仍然保持工作,并期待坏掉的磁盘能够在一定的时间内被repaired,然后重新插入机器中 并重新开始工作。这些磁盘上原本保存的blocks能恢复最好,即使不能恢复,由于namenode有replication机制,也可以保持这些 blocks的副本数不会低于dfs.replication的配置。 因此,可行的办法就是研究hadoop这个部分的代码,对它进行修改,以支持以上所说的这种功能。

代码Hack

hadoop中关于这部分的代码被pack在了 org.apache.hadoop.hdfs.server.datanode中,主要的三个类是 DataNode,FSDataSet和DataBlockScanner,其中Datanode类就是一个datanode运行instance的抽 象,FSDataSet用来表示datanode节点上关于磁盘配置的信息和一些处理接口,而DataBlockScanner是一个线程,用来不断的检 查该datanode上的blocks信息。运行的机制是这样的:

  • 当datanode启动的时候,会初始化许多的信息,如和 namenode通信的socket信息,从hadoop配置文件中读取的配置信息,并利用这些配置信息初始化该datanode instance
  • 每 个datanode中都会有多个内部线程在轮询的作一些操作,其中有一个为DataTransfer,用来向其他datanode传输block数据。 datanode启动过程中同样会把这个线程启动。
  • 启动datanode时,同样还会将DataBlockScanner线程启动,这 个线程用来keep track datanode上的block和更新信息。
  • datanode中还保存了一个FSDataset的,它用来记录当前 datanode上关于磁盘的配置信息,以及这些磁盘或者路径下中保存的 HDFS分布式文件中的信息。通过对 hadoop配置文件的读取,datanode也会初始化这个FSDataset类的instance。
  • datanode本身也是一个 线程类,它的run()中会间歇的调用一个方法:offerService(),这个方法里记录处理的就是datanode的核心处理逻辑。这当中的处理包 括:
    • 每隔3秒钟向namenode发送一次自己的heartbeat信息,这些信息被namenode接收到以后会根据对该 heartbeat的分析向 datanode返回一个datanode需要的操作(DatanodeCommand),并根据从namenode返回的这个 DatanodeCommand来作自己相应的操作。
    • 然后会检查本datanode是否有接收到新的block,并作相应的处理
    • 然 后检查上一次向namenode进行block report的时间,如果超过一定的时间(默认为1小时).就向namenode发送一次block report,以便让namenode上记录的信息保持更新。
    • 每一次接收到来自namenode的操作信息 (DatanodeCommand),datanode都会作相应的操作。

在datanode操作hdfs时, 它会先从它内部保存的FSDataSet实例中得到下一个轮转到的FSVolume,这么一个FSVolume代表了dfs.data.dir的配置项中 用逗号分隔开的某一个本地磁盘目录,然后FSDataSet实例会试着在这个FSVolume中的FSDir实例的checkDirTree()方法:
public void checkDirTree() throws DiskErrorException {
      DiskChecker.checkDir(dir);
           
      if (children != null) {
        for (int i = 0; i < children.length; i++) {
          children[i].checkDirTree();
        }
      }
    }
从程序中可以看 出,实际上它是首先用一个DiskChecker类来check这个dir是否是合法的,然后再check 这个dir的子目录,而判断这个dir是否合法的逻辑如下:
if (!mkdirsWithExistsCheck(dir))
      throw new DiskErrorException("can not create directory: "
                                   + dir.toString());
       
    if (!dir.isDirectory())
      throw new DiskErrorException("not a directory: "
                                   + dir.toString());
           
    if (!dir.canRead())
      throw new DiskErrorException("directory is not readable: "
                                   + dir.toString());
           
    if (!dir.canWrite())
      throw new DiskErrorException("directory is not writable: "
                                   + dir.toString());
从程序中可以看出,实际上,datanode首先尝试在这个dir中创建一个子目录,然后判断这个目录是 否是一个合法的目录,是否可写,是否可读,一旦这几个判断的任何一个发生错误,datanode就认为这个目录出现了问题,于是抛出 DiskErrorException,0.19.0的hadoop此时会把这个异常连续的向上的调用抛出,直到FSVolumeSet实例的 checkDir(),此时datanode发现磁盘错误,然后shutdown()它自己,datanode退出集群。这就是目前datanode处理 磁盘的逻辑。但是想想可以发现,这样的逻辑其实不是最好的,因为就如上面开头描述的那样,此时如果datanode上配置了多磁盘,很有可能其他的磁盘都 是好的,可以继续工作,需要修复或者copy副本到其他datanode的blocks仅仅是这块坏掉的磁盘上的blocks。
既然明白了 datanode处理磁盘错误的逻辑,就可以自己修改datanode的实现代码,来满足自己的需要。

由于datanode关于磁盘的检 错的调用流程为DataNode.checkDiskError( ) -> FSDataSet.checkDataDir() -> FSVolume.checkDirs(),就在这一步,一旦任何一块磁盘发生异常,就把一场抛给了Datanode,datanode于是 shutdown(),并等待人员的修复,并在一 段时间之后开始拷贝这个datanode上的副本到其他的datanode上去。
所以,在FSVolume的checkDirs()方法中,可以 做如下修改:
List goodVolumes = new ArrayList();
      for (int idx = 0; idx < volumes.length; idx++) {
        try {
            volumes[idx].checkDirs();
            goodVolumes.add(volumes[idx]);
        } catch (DiskErrorException e) {
            synchronized(crashedVolumes){
                crashedVolumes.add(volumes[idx]);
            }
        }
      }
      if(goodVolumes.size() == 0) {
          throw new AllDiskErrorException("All " + volumes.length + " disk(s) error: ");
      } else if (volumes.length - goodVolumes.size() > 0) {
          volumes = goodVolumes.toArray(new FSVolume[0]);
          throw new DataNodeDiskErrorException(sb.toString());
      }
程序的逻辑为:创建一个新的队 列,用来保存在遍历每一个FSVolume,如果当前的FSVolume是好的,就加入到这个新的goodVolumes队列中去,而一旦出现坏的磁盘或 者dir,就把它加入到crashedVolumes队列中,最后遍历完成后,将goodVolumes中的FSVolume保存为队列重新赋予给 volumes。

同时,在datanode中create一个线程,让它没过一段时间去check,看是否crashedVolumes 的队列中是否有FSVolume的实例,如果有是否已经repaired,如果没有就继续等待下一次check,代码如下:
class CrashVolumeChecker implements Runnable {
        public void run() {
            while (true) {
                if (data.checkCrashedVolumes()) {
                    try {
                        data.checkDataDir();
                        reBlockReport();
                    } catch (DataNodeDiskErrorException de) {
                        handleDiskError(de.getMessage());
                    } catch (AllDiskErrorException de) {
                        handleAllDiskError(de.getMessage());
                    }
                }
                try {
                    Thread.sleep(CRASH_VOLUME_CHECK_INTERVAL);
                } catch (InterruptedException ie) {
                }
            }
        }
    }
然后再在datanode的run()中将这个线程启动,就可以了。

阅读(541) | 评论(0) | 转发(0) |
给主人留下些什么吧!~~