本文作为DataNode节点的数据块管理系列的最后一篇博文,将详细讨论DataNode中直接为它服务而管理文件数据块的一个大家伙FSDatasetInterface,当然FSDatasetInterface只是一个接口,所要讲的主要是它的一个具体实现——FSDataset。FSDataset主要是在FSVolumeSet之上进行操作的,它的核心是为数据块创建I/O流。先来看看与FSDataset相关联的类。
下面将主要介绍FSDataset的实现及其功能:
1.getBlockReport()
当DataNode节点正常启动之后,就会定期的向NameNode汇报存储在自己节点上的所有数据块信息。每一次汇报的时间间隔可以通过DataNode节点的配置文件来设置,对应的配置项是:dfs.blockreport.intervalMsec。
-
public Block[] getBlockReport() {
-
-
TreeSet<Block> blockSet = new TreeSet<Block>();
-
volumes.getBlockInfo(blockSet);
-
Block blockTable[] = new Block[blockSet.size()];
-
int i = 0;
-
-
for (Iterator<Block> it = blockSet.iterator(); it.hasNext(); i++) {
-
blockTable[i] = it.next();
-
}
-
-
return blockTable;
2.invalidate(Block[])
在DataNode节点成功向NameNode注册之后,就会定期的向NameNode发送心跳包,以此来告诉NameNode节点自己当前工作正常。当NameNode接到DataNode节点的心跳包之后会发送一个响应包,同时这个响应包会顺便捎带NameNode给DataNode节点的一些命令,如删除DataNode节点是一些已经无用的数据块(数据块无用可能是用户删除了一个文件造成的)。这个的时间间隔可以通过DataNode的配置文件来设置,对应的配置项:dfs.heartbeat.interval。
-
public void invalidate(Block invalidBlks[]) throws IOException {
-
boolean error = false;
-
for (int i = 0; i < invalidBlks.length; i++) {
-
File f = null;
-
FSVolume v;
-
synchronized (this) {
-
f = getFile(invalidBlks[i]);
-
DatanodeBlockInfo dinfo = volumeMap.get(invalidBlks[i]);
-
if (dinfo == null) {
-
DataNode.LOG.warn("Unexpected error trying to delete block " + invalidBlks[i] + ". BlockInfo not found in volumeMap.");
-
error = true;
-
continue;
-
}
-
v = dinfo.getVolume();
-
if (f == null) {
-
DataNode.LOG.warn("Unexpected error trying to delete block " + invalidBlks[i] + ". Block not found in blockMap." + ((v == null) ? " " : " Block found in volumeMap."));
-
error = true;
-
continue;
-
}
-
if (v == null) {
-
DataNode.LOG.warn("Unexpected error trying to delete block " + invalidBlks[i] + ". No volume for this block." + " Block found in blockMap. " + f + ".");
-
error = true;
-
continue;
-
}
-
File parent = f.getParentFile();
-
if (parent == null) {
-
DataNode.LOG.warn("Unexpected error trying to delete block " + invalidBlks[i] + ". Parent not found for file " + f + ".");
-
error = true;
-
continue;
-
}
-
v.clearPath(parent);
-
volumeMap.remove(invalidBlks[i]);
-
-
File metaFile = getMetaFile( f, invalidBlks[i] );
-
long blockSize = f.length()+metaFile.length();
-
-
if ( !f.delete() || ( !metaFile.delete() && metaFile.exists() ) ) {
-
DataNode.LOG.warn("Unexpected error trying to delete block " + invalidBlks[i] + " at file " + f);
-
error = true;
-
continue;
-
}
-
v.decDfsUsed(blockSize);
-
DataNode.LOG.info("Deleting block " + invalidBlks[i] + " file " + f);
-
if (f.exists()) {
-
-
-
-
-
DataNode.LOG.info("File " + f + " was deleted but still exists!");
-
}
-
}
-
-
if (error) {
-
throw new IOException("Error in deleting blocks.");
-
}
-
-
}</pre>
-
<pre></pre>
-
<p></p>
-
<p><span style="font-size:18px"><span style="font-size:18px">3.isValidBlock(Block)</span></span></p>
-
<p><span style="font-size:18px"><span style="font-size:18px"> 当一个数据块的副本数不够的时候,NameNode就需要复制这个数据块来增加它的副本数。NameNode会选择一个存有这个数据块的DataNode节点作为主动节点来向其它的DataNode节点传送这个数据块。在主动DataNode节点传送这个数据块之前会先检测这个数据节点,实际上就是判断本地磁盘上有没有这个数据块文件。(如果DataNode节点正在接受这个数据块时,验证也是通不过的)</span></span></p>
-
<p><span style="font-size:18px"><span style="font-size:18px"></span></span></p>
-
<pre name="code" class="java">public boolean isValidBlock(Block b) {
-
return validateBlockFile(b) != null;
-
}
-
-
-
-
-
File validateBlockFile(Block b) {
-
-
File f = getFile(b);
-
-
if(f != null && f.exists())
-
return f;
-
-
if (InterDatanodeProtocol.LOG.isDebugEnabled()) {
-
InterDatanodeProtocol.LOG.debug("b=" + b + ", f=" + f);
-
}
-
-
return null;
-
}
-
</pre><br>
-
<span style="font-size:18px">4.getLength(Block)<br>
-
</span>
-
<p><span style="font-size:18px"> 在DataNode节点向其它DataNode节点>复制数据块时还需要验证一个信息,就是NameNode保存的该数据块的长度是否和DataNode节点上真实的文件长度一致。</span></p>
-
<p></p>
-
<pre name="code" class="java"> public long getLength(Block b) throws IOException {
-
return getBlockFile(b).length();
-
}
-
-
-
-
-
public synchronized File getBlockFile(Block b) throws IOException {
-
File f = validateBlockFile(b);
-
if(f == null) {
-
if (InterDatanodeProtocol.LOG.isDebugEnabled()) {
-
InterDatanodeProtocol.LOG.debug("b=" + b + ", volumeMap=" + volumeMap);
-
}
-
throw new IOException("Block " + b + " is not valid.");
-
}
-
-
return f;
-
}
-
</pre>
-
<p><br>
-
</p>
-
<p><span style="font-size:18px">5.getStoredBlock(long)<br>
-
当DataNode节点对某一个数据块进行恢复/同步操作的时候,会和该数据块的其它副本进行比较,即比较时间戳和数据大小,而这些信息需要获取到数据块的基本信息。(它不仅能够获取到在DataNode上已经存在的数据块基本信息,而且还能获取DataNode节点当前正在接受的数据块)</span><br>
-
</p>
-
<pre name="code" class="java">
-
private static File findMetaFile(final File blockFile) throws IOException {
-
final String prefix = blockFile.getName() + "_";
-
final File parent = blockFile.getParentFile();
-
File[] matches = parent.listFiles(new FilenameFilter() {
-
public boolean accept(File dir, String name) {
-
return dir.equals(parent) && name.startsWith(prefix) && name.endsWith(METADATA_EXTENSION);
-
}
-
});
-
-
if (matches == null || matches.length == 0) {
-
throw new IOException("Meta file not found, blockFile=" + blockFile);
-
}
-
else if (matches.length > 1) {
-
throw new IOException("Found more than one meta files: "
-
+ Arrays.asList(matches));
-
}
-
return matches[0];
-
}
-
-
-
private static long parseGenerationStamp(File blockFile, File metaFile) throws IOException {
-
String metaname = metaFile.getName();
-
String gs = metaname.substring(blockFile.getName().length() + 1,
-
metaname.length() - METADATA_EXTENSION.length());
-
try {
-
return Long.parseLong(gs);
-
} catch(NumberFormatException nfe) {
-
throw (IOException)new IOException("blockFile=" + blockFile
-
+ ", metaFile=" + metaFile).initCause(nfe);
-
}
-
}
-
-
-
public File findBlockFile(long blockId) {
-
final Block b = new Block(blockId);
-
File blockfile = null;
-
ActiveFile activefile = ongoingCreates.get(b);
-
if (activefile != null) {
-
blockfile = activefile.file;
-
}
-
if (blockfile == null) {
-
blockfile = getFile(b);
-
}
-
if (blockfile == null) {
-
if (DataNode.LOG.isDebugEnabled()) {
-
DataNode.LOG.debug("ongoingCreates=" + ongoingCreates);
-
DataNode.LOG.debug("volumeMap=" + volumeMap);
-
}
-
}
-
return blockfile;
-
}
-
-
-
public synchronized Block getStoredBlock(long blkid) throws IOException {
-
File blockfile = findBlockFile(blkid);
-
if (blockfile == null) {
-
return null;
-
}
-
File metafile = findMetaFile(blockfile);
-
-
return new Block(blkid, blockfile.length(), parseGenerationStamp(blockfile, metafile));
-
}
-
</pre><br>
-
<p><span style="font-size:18px">6.getDfsUsed()/getCapacity()/getRemaining()</span></p>
-
<p> <span style="font-size:18px"> DataNode节点每一次向NameNode节点发送心跳包的时候都会带上自己当前的基本信息,这个基本信息其中就包括自己当前存储空间的状态信息(总容量,使用量,剩余量)。</span></p>
-
<p><span style="font-size:18px"></span></p>
-
<pre name="code" class="java"> public long getDfsUsed() throws IOException {
-
return volumes.getDfsUsed();
-
}
-
-
public long getCapacity() throws IOException {
-
return volumes.getCapacity();
-
}
-
-
public long getRemaining() throws IOException {
-
return volumes.getRemaining();
-
}
-
</pre>
-
<p></p>
-
<p><span style="font-size:18px">7.writeToBlock(Block,boolean)</span></p>
-
<p><span style="font-size:18px"> 当DataNode节点开始接受来自其它DataNode节点或者用户客户端传过来的一个Block数据时就需要先为该数据块创建对应的写入流,这个写入流会产生两个文件,一个是数据文件,一个是元数据文件,这个元数据文件主要包括数据文件对应的校验和信息。</span></p>
-
<p><span style="font-size:18px"></span></p>
-
<pre name="code" class="java"> public BlockWriteStreams writeToBlock(Block b, boolean isRecovery) throws IOException {
-
-
if (isValidBlock(b)) {
-
if (!isRecovery) {
-
throw new BlockAlreadyExistsException("Block " + b + " is valid, and cannot be written to.");
-
}
-
-
detachBlock(b, 1);
-
}
-
-
long blockSize = b.getNumBytes();
-
-
-
File f = null;
-
List<Thread> threads = null;
-
synchronized (this) {
-
-
ActiveFile activeFile = ongoingCreates.get(b);
-
if (activeFile != null) {
-
f = activeFile.file;
-
threads = activeFile.threads;
-
-
if (!isRecovery) {
-
throw new BlockAlreadyExistsException("Block " + b + " has already been started (though not completed), and thus cannot be created.");
-
} else {
-
-
for (Thread thread:threads) {
-
thread.interrupt();
-
}
-
}
-
ongoingCreates.remove(b);
-
}
-
-
FSVolume v = null;
-
if (!isRecovery) {
-
v = volumes.getNextVolume(blockSize);
-
-
f = createTmpFile(v, b);
-
volumeMap.put(b, new DatanodeBlockInfo(v));
-
} else if (f != null) {
-
DataNode.LOG.info("Reopen already-open Block for append " + b);
-
-
v = volumeMap.get(b).getVolume();
-
volumeMap.put(b, new DatanodeBlockInfo(v));
-
} else {
-
-
DataNode.LOG.info("Reopen Block for append " + b);
-
v = volumeMap.get(b).getVolume();
-
f = createTmpFile(v, b);
-
File blkfile = getBlockFile(b);
-
File oldmeta = getMetaFile(b);
-
File newmeta = getMetaFile(f, b);
-
-
-
DataNode.LOG.debug("Renaming " + oldmeta + " to " + newmeta);
-
if (!oldmeta.renameTo(newmeta)) {
-
throw new IOException("Block " + b + " reopen failed. " + " Unable to move meta file " + oldmeta + " to tmp dir " + newmeta);
-
}
-
-
-
DataNode.LOG.debug("Renaming " + blkfile + " to " + f);
-
if (!blkfile.renameTo(f)) {
-
if (!f.delete()) {
-
throw new IOException("Block " + b + " reopen failed. " + " Unable to remove file " + f);
-
}
-
if (!blkfile.renameTo(f)) {
-
throw new IOException("Block " + b + " reopen failed. " + " Unable to move block file " + blkfile + " to tmp dir " + f);
-
}
-
}
-
volumeMap.put(b, new DatanodeBlockInfo(v));
-
}
-
-
if (f == null) {
-
DataNode.LOG.warn("Block " + b + " reopen failed " + " Unable to locate tmp file.");
-
throw new IOException("Block " + b + " reopen failed " + " Unable to locate tmp file.");
-
}
-
ongoingCreates.put(b, new ActiveFile(f, threads));
-
}
-
-
-
try {
-
if (threads != null) {
-
for (Thread thread:threads) {
-
thread.join();
-
}
-
}
-
} catch (InterruptedException e) {
-
throw new IOException("Recovery waiting for thread interrupted.");
-
}
-
-
-
File metafile = getMetaFile(f, b);
-
DataNode.LOG.debug("writeTo blockfile is " + f + " of size " + f.length());
-
DataNode.LOG.debug("writeTo metafile is " + metafile + " of size " + metafile.length());
-
-
return createBlockWriteStreams( f , metafile);
-
}
-
-
public boolean detachBlock(Block block, int numLinks) throws IOException {
-
DatanodeBlockInfo info = null;
-
-
synchronized (this) {
-
info = volumeMap.get(block);
-
}
-
return info.detachBlock(block, numLinks);
-
}
-
</pre>
-
<p></p>
-
<p><span style="font-size:18px">7.finalizeBlock(Block)</span></p>
-
<p> <span style="font-size:18px">当DataNode节点成功的接收了一个Block的数据之后,就需要把这个Block的数据文件和元数据文件移动到它真正的位置处。(如果是数据的最开始来源于用户,当用户在传送数据时客户端出现异常,则仍然认为接收的Block的是“成功”的,因为客户端在恢复之后选择append操作仍然可以接着再传)</span><br>
-
</p>
-
<p><span style="font-size:18px"></span></p>
-
<pre name="code" class="java">public synchronized void finalizeBlock(Block b) throws IOException {
-
ActiveFile activeFile = ongoingCreates.get(b);
-
if (activeFile == null) {
-
throw new IOException("Block " + b + " is already finalized.");
-
}
-
File f = activeFile.file;
-
if (f == null || !f.exists()) {
-
throw new IOException("No temporary file " + f + " for block " + b);
-
}
-
FSVolume v = volumeMap.get(b).getVolume();
-
if (v == null) {
-
throw new IOException("No volume for temporary file " + f + " for block " + b);
-
}
-
-
File dest = null;
-
dest = v.addBlock(b, f);
-
volumeMap.put(b, new DatanodeBlockInfo(v, dest));
-
ongoingCreates.remove(b);
-
}
-
</pre><br>
-
<span style="font-size:18px">8.unfinalizeBlock(Block)</span>
-
<p></p>
-
<p><span style="font-size:18px"> 当我增加一个Block副本的时候,NameNode节点会选择一个已存在该Block副本的DataNode节点作为主节点,然后选择若干个其它DataNode节点作为备份节点,如果备份节点在接受这个Block的数据时发生I/O异常,就会删除与这个Block相关的信息<br>
-
</span></p>
-
<p><span style="font-size:18px"></span></p>
-
<pre name="code" class="java">public synchronized void unfinalizeBlock(Block b) throws IOException {
-
-
ActiveFile activefile = ongoingCreates.remove(b);
-
if (activefile == null) {
-
return;
-
}
-
volumeMap.remove(b);
-
-
-
if (delBlockFromDisk(activefile.file, getMetaFile(activefile.file, b), b)) {
-
DataNode.LOG.warn("Block " + b + " unfinalized and removed. " );
-
}
-
}
-
-
private boolean delBlockFromDisk(File blockFile, File metaFile, Block b) {
-
if (blockFile == null) {
-
DataNode.LOG.warn("No file exists for block: " + b);
-
return true;
-
}
-
-
if (!blockFile.delete()) {
-
DataNode.LOG.warn("Not able to delete the block file: " + blockFile);
-
return false;
-
} else {
-
if (metaFile != null && !metaFile.delete()) {
-
DataNode.LOG.warn( "Not able to delete the meta block file: " + metaFile);
-
return false;
-
}
-
}
-
-
return true;
-
}
-
</pre><br>
-
<span style="font-size:18px">9.getBlockInputStream(Block,long)</span>
-
<p></p>
-
<p><span style="font-size:18px"> HDFS支持文件的随机读,当用户想要从某一个文件中读取某个位置以后的若干数据时,HDFS的用户客户端会向<span style="font-size:18px">NameNode节点</span>请求该文件的起始位置位于哪一个数据块并且该数据块存放在那些DataNode节点上,而NameNode同时也会按照客户端与这些DataNode节点的距离进行升序排序。从NameNode返回之后,客户端会依次尝试从这些DataNode节点来获取数据,对应的DataNode也会从指定位置开始传送这些数据。<br>
-
</span></p>
-
<p><span style="font-size:18px"></span></p>
-
<pre name="code" class="java"> public synchronized InputStream getBlockInputStream(Block b, long seekOffset) throws IOException {
-
-
File blockFile = getBlockFile(b);
-
RandomAccessFile blockInFile = new RandomAccessFile(blockFile, "r");
-
if (seekOffset > 0) {
-
blockInFile.seek(seekOffset);
-
}
-
-
return new FileInputStream(blockInFile.getFD());
-
}
-
</pre>
-
<p></p>
-
<p></p>
-
<span style="font-size:18px">关于接口的其它重要的方法,我将在以后的博文中结合实际情况来详细的讨论。</span><br>
-
<pre></pre>
-
-
<div style="padding-top:20px">
-
<p style="font-size:12px;">版权声明:本文为博主原创文章,未经博主允许不得转载。</p>
阅读(1249) | 评论(0) | 转发(0) |