分类:
2010-09-09 13:40:11
背景
hadoop当中的每一个datanode上,都会保存 一些HDFS中的blocks,而 这些blocks实际上都是按照一定的格式保存在 datanode这台机器的某些本地目录中的,当通过hadoop向HDFS中保存文件的时候,这些文件就会被hadoop按照blocksize切分成 多个blocks,并按照一定的负载和调度算法和配置文件中的每个block的副本数分配到集群的某些datanode上去。而且hadoop最近的 版本还支持datanode上将HDFS对应的本地目录设置成多个。这个非常有价值,因为通过这个配置,可以在集群的datanode上挂载多个磁盘,每个磁盘挂载在不同的目录下,然后在 hadoop-site.xml中将datanode的dfs.data.dir配置成由逗号分开的多个目录,这 些目录分别对应了多个挂载的磁盘。这样可以在集群的io非常高的时候将io操作分配到各个磁盘上去,减少磁盘的压力和出错的几率。但是,根据对目前 0.19.0的版本中的源代码的研究发现,当多个磁盘中的任何一个crash后,整个datanode就会shutdown它自己,而不管其他的磁盘是不 是 仍然可以工作。这样其实就产生了问题:因为如果仅仅是一个磁盘发生错误就把整个datanode给停掉,那么namenode就势必会在一定的时间间隔后 将这台datanode上保存的所有的blocks转存到其他的datanode上去,以保持blocks的副本数不低于hadoop配置文件中 dfs.replication配置项指定的数目。但是这种情况下其实出现问题的那台datanode上仅仅是一个磁盘上blocks需要被转存,其他好 的磁盘上的blocks是仍然可用的,这样就会增加集群中的负载和集群中所有机器的负载,而且还会造成资源的浪费和集群资源的不合理使用。如果 datanode上有好几十TB的,那么这个过程的代价将会更加严重。而更加合理的处理应该是:即使datanode上有某一个磁盘或者多个磁盘 crash了,只要不是全部crash,datanode应该仍然保持工作,并期待坏掉的磁盘能够在一定的时间内被repaired,然后重新插入机器中 并重新开始工作。这些磁盘上原本保存的blocks能恢复最好,即使不能恢复,由于namenode有replication机制,也可以保持这些 blocks的副本数不会低于dfs.replication的配置。 因此,可行的办法就是研究hadoop这个部分的代码,对它进行修改,以支持以上所说的这种功能。
代码Hack
hadoop中关于这部分的代码被pack在了 org.apache.hadoop.hdfs.server.datanode中,主要的三个类是 DataNode,FSDataSet和DataBlockScanner,其中Datanode类就是一个datanode运行instance的抽 象,FSDataSet用来表示datanode节点上关于磁盘配置的信息和一些处理接口,而DataBlockScanner是一个线程,用来不断的检 查该datanode上的blocks信息。运行的机制是这样的:
在datanode操作hdfs时, 它会先从它内部保存的FSDataSet实例中得到下一个轮转到的FSVolume,这么一个FSVolume代表了dfs.data.dir的配置项中 用逗号分隔开的某一个本地磁盘目录,然后FSDataSet实例会试着在这个FSVolume中的FSDir实例的checkDirTree()方法:
public void checkDirTree() throws DiskErrorException {
DiskChecker.checkDir(dir);
if (children != null) {
for (int i = 0; i < children.length; i++) {
children[i].checkDirTree();
}
}
}
从程序中可以看 出,实际上它是首先用一个DiskChecker类来check这个dir是否是合法的,然后再check 这个dir的子目录,而判断这个dir是否合法的逻辑如下:
if (!mkdirsWithExistsCheck(dir))
throw new DiskErrorException("can not create directory: "
+ dir.toString());
if (!dir.isDirectory())
throw new DiskErrorException("not a directory: "
+ dir.toString());
if (!dir.canRead())
throw new DiskErrorException("directory is not readable: "
+ dir.toString());
if (!dir.canWrite())
throw new DiskErrorException("directory is not writable: "
+ dir.toString());
从程序中可以看出,实际上,datanode首先尝试在这个dir中创建一个子目录,然后判断这个目录是 否是一个合法的目录,是否可写,是否可读,一旦这几个判断的任何一个发生错误,datanode就认为这个目录出现了问题,于是抛出 DiskErrorException,0.19.0的hadoop此时会把这个异常连续的向上的调用抛出,直到FSVolumeSet实例的 checkDir(),此时datanode发现磁盘错误,然后shutdown()它自己,datanode退出集群。这就是目前datanode处理 磁盘的逻辑。但是想想可以发现,这样的逻辑其实不是最好的,因为就如上面开头描述的那样,此时如果datanode上配置了多磁盘,很有可能其他的磁盘都 是好的,可以继续工作,需要修复或者copy副本到其他datanode的blocks仅仅是这块坏掉的磁盘上的blocks。
既然明白了 datanode处理磁盘错误的逻辑,就可以自己修改datanode的实现代码,来满足自己的需要。
由于datanode关于磁盘的检 错的调用流程为DataNode.checkDiskError( ) -> FSDataSet.checkDataDir() -> FSVolume.checkDirs(),就在这一步,一旦任何一块磁盘发生异常,就把一场抛给了Datanode,datanode于是 shutdown(),并等待人员的修复,并在一 段时间之后开始拷贝这个datanode上的副本到其他的datanode上去。
所以,在FSVolume的checkDirs()方法中,可以 做如下修改:
List
for (int idx = 0; idx < volumes.length; idx++) {
try {
volumes[idx].checkDirs();
goodVolumes.add(volumes[idx]);
} catch (DiskErrorException e) {
synchronized(crashedVolumes){
crashedVolumes.add(volumes[idx]);
}
}
}
if(goodVolumes.size() == 0) {
throw new AllDiskErrorException("All " + volumes.length + " disk(s) error: ");
} else if (volumes.length - goodVolumes.size() > 0) {
volumes = goodVolumes.toArray(new FSVolume[0]);
throw new DataNodeDiskErrorException(sb.toString());
}
程序的逻辑为:创建一个新的队 列,用来保存在遍历每一个FSVolume,如果当前的FSVolume是好的,就加入到这个新的goodVolumes队列中去,而一旦出现坏的磁盘或 者dir,就把它加入到crashedVolumes队列中,最后遍历完成后,将goodVolumes中的FSVolume保存为队列重新赋予给 volumes。
同时,在datanode中create一个线程,让它没过一段时间去check,看是否crashedVolumes 的队列中是否有FSVolume的实例,如果有是否已经repaired,如果没有就继续等待下一次check,代码如下:
class CrashVolumeChecker implements Runnable {
public void run() {
while (true) {
if (data.checkCrashedVolumes()) {
try {
data.checkDataDir();
reBlockReport();
} catch (DataNodeDiskErrorException de) {
handleDiskError(de.getMessage());
} catch (AllDiskErrorException de) {
handleAllDiskError(de.getMessage());
}
}
try {
Thread.sleep(CRASH_VOLUME_CHECK_INTERVAL);
} catch (InterruptedException ie) {
}
}
}
}
然后再在datanode的run()中将这个线程启动,就可以了。