Chinaunix首页 | 论坛 | 博客
  • 博客访问: 1959218
  • 博文数量: 1000
  • 博客积分: 0
  • 博客等级: 民兵
  • 技术积分: 7921
  • 用 户 组: 普通用户
  • 注册时间: 2013-08-20 09:23
个人简介

storage R&D guy.

文章分类

全部博文(1000)

文章存档

2019年(5)

2017年(47)

2016年(38)

2015年(539)

2014年(193)

2013年(178)

分类: 服务器与存储

2015-07-31 16:14:05

本文作为DataNode节点的数据块管理系列的最后一篇博文,将详细讨论DataNode中直接为它服务而管理文件数据块的一个大家伙FSDatasetInterface,当然FSDatasetInterface只是一个接口,所要讲的主要是它的一个具体实现——FSDataset。FSDataset主要是在FSVolumeSet之上进行操作的,它的核心是为数据块创建I/O流。先来看看与FSDataset相关联的类。


     下面将主要介绍FSDataset的实现及其功能:

1.getBlockReport()

   当DataNode节点正常启动之后,就会定期的向NameNode汇报存储在自己节点上的所有数据块信息。每一次汇报的时间间隔可以通过DataNode节点的配置文件来设置,对应的配置项是:dfs.blockreport.intervalMsec

[java] view plaincopy
  1. public Block[] getBlockReport() {  
  2.   
  3.  TreeSet<Block> blockSet = new TreeSet<Block>();  
  4.  volumes.getBlockInfo(blockSet);//获取“磁盘”上存储的所有数据块信息  
  5.  Block blockTable[] = new Block[blockSet.size()];  
  6.  int i = 0;  
  7.    
  8.  for (Iterator<Block> it = blockSet.iterator(); it.hasNext(); i++) {  
  9.    blockTable[i] = it.next();  
  10.  }  
  11.    
  12.  return blockTable;  

2.invalidate(Block[])

   在DataNode节点成功向NameNode注册之后,就会定期的向NameNode发送心跳包,以此来告诉NameNode节点自己当前工作正常。当NameNode接到DataNode节点的心跳包之后会发送一个响应包,同时这个响应包会顺便捎带NameNode给DataNode节点的一些命令,如删除DataNode节点是一些已经无用的数据块(数据块无用可能是用户删除了一个文件造成的)。这个的时间间隔可以通过DataNode的配置文件来设置,对应的配置项:dfs.heartbeat.interval

[java] view plaincopy
  1.  public void invalidate(Block invalidBlks[]) throws IOException {  
  2.     boolean error = false;  
  3.     for (int i = 0; i < invalidBlks.length; i++) {  
  4.       File f = null;  
  5.       FSVolume v;  
  6.       synchronized (this) {  
  7.         f = getFile(invalidBlks[i]);//获取数据块对应的数据文件  
  8.         DatanodeBlockInfo dinfo = volumeMap.get(invalidBlks[i]);//找到数据块的位置信息  
  9.         if (dinfo == null) {  
  10.           DataNode.LOG.warn("Unexpected error trying to delete block " + invalidBlks[i] +  ". BlockInfo not found in volumeMap.");  
  11.           error = true;  
  12.           continue;  
  13.         }  
  14.         v = dinfo.getVolume();  
  15.         if (f == null) {  
  16.           DataNode.LOG.warn("Unexpected error trying to delete block " + invalidBlks[i] + ". Block not found in blockMap." + ((v == null) ? " " : " Block found in volumeMap."));  
  17.           error = true;  
  18.           continue;  
  19.         }  
  20.         if (v == null) {  
  21.           DataNode.LOG.warn("Unexpected error trying to delete block " + invalidBlks[i] + ". No volume for this block." + " Block found in blockMap. " + f + ".");  
  22.           error = true;  
  23.           continue;  
  24.         }  
  25.         File parent = f.getParentFile();  
  26.         if (parent == null) {  
  27.           DataNode.LOG.warn("Unexpected error trying to delete block " + invalidBlks[i] + ". Parent not found for file " + f + ".");  
  28.           error = true;  
  29.           continue;  
  30.         }  
  31.         v.clearPath(parent);//数据块所在的存储目录的blocks数量依次减1  
  32.         volumeMap.remove(invalidBlks[i]);//清除数据块的位置映射信息<pre name="code" class="java">      }  
  33.         
  34.       File metaFile = getMetaFile( f, invalidBlks[i] );//获取数据块对应的元数据文件  
  35.       long blockSize = f.length()+metaFile.length();  
  36.       //删除数据块对应的数据文件和元数据文件  
  37.       if ( !f.delete() || ( !metaFile.delete() && metaFile.exists() ) ) {  
  38.         DataNode.LOG.warn("Unexpected error trying to delete block " + invalidBlks[i] + " at file " + f);  
  39.         error = true;  
  40.         continue;  
  41.       }  
  42.       v.decDfsUsed(blockSize);//更新数据块所在“分区”的空间使用情况  
  43.       DataNode.LOG.info("Deleting block " + invalidBlks[i] + " file " + f);  
  44.       if (f.exists()) {  
  45.         //  
  46.         // This is a temporary check especially for hadoop-1220.   
  47.         // This will go away in the future.  
  48.         //  
  49.         DataNode.LOG.info("File " + f + " was deleted but still exists!");  
  50.       }  
  51.     }  
  52.       
  53.     if (error) {  
  54.       throw new IOException("Error in deleting blocks.");  
  55.     }  
  56.       
  57.   }</pre>  
  58. <pre></pre>  
  59. <p></p>  
  60. <p><span style="font-size:18px"><span style="font-size:18px">3.isValidBlock(Block)</span></span></p>  
  61. <p><span style="font-size:18px"><span style="font-size:18px">    当一个数据块的副本数不够的时候,NameNode就需要复制这个数据块来增加它的副本数。NameNode会选择一个存有这个数据块的DataNode节点作为主动节点来向其它的DataNode节点传送这个数据块。在主动DataNode节点传送这个数据块之前会先检测这个数据节点,实际上就是判断本地磁盘上有没有这个数据块文件。(如果DataNode节点正在接受这个数据块时,验证也是通不过的)</span></span></p>  
  62. <p><span style="font-size:18px"><span style="font-size:18px"></span></span></p>  
  63. <pre name="code" class="java">public boolean isValidBlock(Block b) {  
  64.     return validateBlockFile(b) != null;  
  65.   }  
  66.   
  67.   /** 
  68.    * Find the file corresponding to the block and return it if it exists. 
  69.    */  
  70.   File validateBlockFile(Block b) {  
  71.     //获取数据块对应的数据文件  
  72.     File f = getFile(b);  
  73.       
  74.     if(f != null && f.exists())  
  75.       return f;  
  76.       
  77.     if (InterDatanodeProtocol.LOG.isDebugEnabled()) {  
  78.       InterDatanodeProtocol.LOG.debug("b=" + b + ", f=" + f);  
  79.     }  
  80.       
  81.     return null;  
  82.   }  
  83. </pre><br>  
  84. <span style="font-size:18px">4.getLength(Block)<br>  
  85. </span>  
  86. <p><span style="font-size:18px">   在DataNode节点向其它DataNode节点>复制数据块时还需要验证一个信息,就是NameNode保存的该数据块的长度是否和DataNode节点上真实的文件长度一致。</span></p>  
  87. <p></p>  
  88. <pre name="code" class="java">  public long getLength(Block b) throws IOException {  
  89.     return getBlockFile(b).length();  
  90.   }  
  91.   
  92.   /** 
  93.    * Get File name for a given block. 
  94.    */  
  95.   public synchronized File getBlockFile(Block b) throws IOException {  
  96.     File f = validateBlockFile(b);  
  97.     if(f == null) {  
  98.       if (InterDatanodeProtocol.LOG.isDebugEnabled()) {  
  99.         InterDatanodeProtocol.LOG.debug("b=" + b + ", volumeMap=" + volumeMap);  
  100.       }  
  101.       throw new IOException("Block " + b + " is not valid.");  
  102.     }  
  103.       
  104.     return f;  
  105.   }  
  106. </pre>  
  107. <p><br>  
  108. </p>  
  109. <p><span style="font-size:18px">5.getStoredBlock(long)<br>  
  110.    当DataNode节点对某一个数据块进行恢复/同步操作的时候,会和该数据块的其它副本进行比较,即比较时间戳和数据大小,而这些信息需要获取到数据块的基本信息。(它不仅能够获取到在DataNode上已经存在的数据块基本信息,而且还能获取DataNode节点当前正在接受的数据块)</span><br>  
  111. </p>  
  112. <pre name="code" class="java"/*获取数据块对应的元数据文件*/  
  113.   private static File findMetaFile(final File blockFile) throws IOException {  
  114.     final String prefix = blockFile.getName() + "_";  
  115.     final File parent = blockFile.getParentFile();  
  116.     File[] matches = parent.listFiles(new FilenameFilter() {  
  117.       public boolean accept(File dir, String name) {  
  118.         return dir.equals(parent) && name.startsWith(prefix) && name.endsWith(METADATA_EXTENSION);  
  119.       }  
  120.     });  
  121.   
  122.     if (matches == null || matches.length == 0) {  
  123.       throw new IOException("Meta file not found, blockFile=" + blockFile);  
  124.     }  
  125.     else if (matches.length > 1) {  
  126.       throw new IOException("Found more than one meta files: "   
  127.           + Arrays.asList(matches));  
  128.     }  
  129.     return matches[0];  
  130.   }  
  131.     
  132.   /** 通过解析数据块的元数据文件名来获取该数据块的版本时间戳 */  
  133.   private static long parseGenerationStamp(File blockFile, File metaFile) throws IOException {  
  134.     String metaname = metaFile.getName();  
  135.     String gs = metaname.substring(blockFile.getName().length() + 1,  
  136.         metaname.length() - METADATA_EXTENSION.length());  
  137.     try {  
  138.       return Long.parseLong(gs);  
  139.     } catch(NumberFormatException nfe) {  
  140.       throw (IOException)new IOException("blockFile=" + blockFile  
  141.           + ", metaFile=" + metaFile).initCause(nfe);  
  142.     }  
  143.   }  
  144.   
  145.   /** 获取数据块对应的数据文件 */   
  146.   public File findBlockFile(long blockId) {  
  147.     final Block b = new Block(blockId);  
  148.     File blockfile = null;  
  149.     ActiveFile activefile = ongoingCreates.get(b);  
  150.     if (activefile != null) {  
  151.       blockfile = activefile.file;  
  152.     }  
  153.     if (blockfile == null) {  
  154.       blockfile = getFile(b);  
  155.     }  
  156.     if (blockfile == null) {  
  157.       if (DataNode.LOG.isDebugEnabled()) {  
  158.         DataNode.LOG.debug("ongoingCreates=" + ongoingCreates);  
  159.         DataNode.LOG.debug("volumeMap=" + volumeMap);  
  160.       }  
  161.     }  
  162.     return blockfile;  
  163.   }  
  164.   
  165.   /** {@inheritDoc} */  
  166.   public synchronized Block getStoredBlock(long blkid) throws IOException {  
  167.     File blockfile = findBlockFile(blkid);  
  168.     if (blockfile == null) {  
  169.       return null;  
  170.     }  
  171.     File metafile = findMetaFile(blockfile);  
  172.     //数据块的基本信息包括id、大小、版本  
  173.     return new Block(blkid, blockfile.length(), parseGenerationStamp(blockfile, metafile));  
  174.   }  
  175. </pre><br>  
  176. <p><span style="font-size:18px">6.getDfsUsed()/getCapacity()/getRemaining()</span></p>  
  177. <p>    <span style="font-size:18px"> DataNode节点每一次向NameNode节点发送心跳包的时候都会带上自己当前的基本信息,这个基本信息其中就包括自己当前存储空间的状态信息(总容量,使用量,剩余量)。</span></p>  
  178. <p><span style="font-size:18px"></span></p>  
  179. <pre name="code" class="java">  public long getDfsUsed() throws IOException {  
  180.     return volumes.getDfsUsed();  
  181.   }  
  182.     
  183.   public long getCapacity() throws IOException {  
  184.     return volumes.getCapacity();  
  185.   }  
  186.   
  187.   public long getRemaining() throws IOException {  
  188.     return volumes.getRemaining();  
  189.   }  
  190. </pre>  
  191. <p></p>  
  192. <p><span style="font-size:18px">7.writeToBlock(Block,boolean)</span></p>  
  193. <p><span style="font-size:18px">    当DataNode节点开始接受来自其它DataNode节点或者用户客户端传过来的一个Block数据时就需要先为该数据块创建对应的写入流,这个写入流会产生两个文件,一个是数据文件,一个是元数据文件,这个元数据文件主要包括数据文件对应的校验和信息。</span></p>  
  194. <p><span style="font-size:18px"></span></p>  
  195. <pre name="code" class="java">  public BlockWriteStreams writeToBlock(Block b, boolean isRecovery) throws IOException {  
  196.    //检查Block是否已经存在  
  197.    if (isValidBlock(b)) {  
  198.       if (!isRecovery) {//如果Block已经存在,而又不是append则抛出异常  
  199.          throw new BlockAlreadyExistsException("Block " + b + " is valid, and cannot be written to.");  
  200.       }  
  201.       // The other reason is that an "append" is occurring to this block.  
  202.       detachBlock(b, 1);  
  203.     }  
  204.       
  205.     long blockSize = b.getNumBytes();  
  206.   
  207.     //  
  208.     File f = null;  
  209.     List<Thread> threads = null;  
  210.     synchronized (this) {  
  211.       //查看Block是否已经正在接收  
  212.       ActiveFile activeFile = ongoingCreates.get(b);  
  213.       if (activeFile != null) {  
  214.         f = activeFile.file;  
  215.         threads = activeFile.threads;  
  216.           
  217.         if (!isRecovery) {//DataNode节点当前正在接收该Block,当此次又不是append所以出错  
  218.           throw new BlockAlreadyExistsException("Block " + b + " has already been started (though not completed), and thus cannot be created.");  
  219.         } else {  
  220.           //停止所有正在接收该Block的线程  
  221.           for (Thread thread:threads) {  
  222.             thread.interrupt();  
  223.           }  
  224.         }  
  225.         ongoingCreates.remove(b);  
  226.       }  
  227.         
  228.       FSVolume v = null;  
  229.       if (!isRecovery) {  
  230.         v = volumes.getNextVolume(blockSize);  
  231.         // 为Block创建一个临时中间文件来接收它的数据  
  232.         f = createTmpFile(v, b);  
  233.         volumeMap.put(b, new DatanodeBlockInfo(v));  
  234.       } else if (f != null) {  
  235.         DataNode.LOG.info("Reopen already-open Block for append " + b);  
  236.         // create or reuse temporary file to hold block in the designated volume  
  237.         v = volumeMap.get(b).getVolume();  
  238.         volumeMap.put(b, new DatanodeBlockInfo(v));  
  239.       } else {  
  240.         // reopening block for appending to it.  
  241.         DataNode.LOG.info("Reopen Block for append " + b);  
  242.         v = volumeMap.get(b).getVolume();  
  243.         f = createTmpFile(v, b);//为Block创建一个临时数据文件  
  244.         File blkfile = getBlockFile(b);//获取Block对应的数据文件  
  245.         File oldmeta = getMetaFile(b);//获取Block对应的元数据文件  
  246.         File newmeta = getMetaFile(f, b);//获取Block的新临时元数据文件  
  247.   
  248.         // rename meta file to tmp directory  
  249.         DataNode.LOG.debug("Renaming " + oldmeta + " to " + newmeta);  
  250.         if (!oldmeta.renameTo(newmeta)) {  
  251.           throw new IOException("Block " + b + " reopen failed. " + " Unable to move meta file  " + oldmeta + " to tmp dir " + newmeta);  
  252.         }  
  253.   
  254.         // rename block file to tmp directory  
  255.         DataNode.LOG.debug("Renaming " + blkfile + " to " + f);  
  256.         if (!blkfile.renameTo(f)) {  
  257.           if (!f.delete()) {  
  258.             throw new IOException("Block " + b + " reopen failed. " + " Unable to remove file " + f);  
  259.           }  
  260.           if (!blkfile.renameTo(f)) {  
  261.             throw new IOException("Block " + b + " reopen failed. " + " Unable to move block file " + blkfile + " to tmp dir " + f);  
  262.           }  
  263.         }  
  264.         volumeMap.put(b, new DatanodeBlockInfo(v));  
  265.       }  
  266.   
  267.       if (f == null) {  
  268.         DataNode.LOG.warn("Block " + b + " reopen failed " + " Unable to locate tmp file.");  
  269.         throw new IOException("Block " + b + " reopen failed " + " Unable to locate tmp file.");  
  270.       }  
  271.       ongoingCreates.put(b, new ActiveFile(f, threads));//记录DataNode节点正在接收该Block  
  272.    }  
  273.   
  274.     //等待所有正在接收该Block的线程结束  
  275.     try {  
  276.       if (threads != null) {  
  277.         for (Thread thread:threads) {  
  278.           thread.join();  
  279.         }  
  280.       }  
  281.     } catch (InterruptedException e) {  
  282.       throw new IOException("Recovery waiting for thread interrupted.");  
  283.     }  
  284.   
  285.     //获取该Block的元数据文件  
  286.     File metafile = getMetaFile(f, b);  
  287.     DataNode.LOG.debug("writeTo blockfile is " + f + " of size " + f.length());  
  288.     DataNode.LOG.debug("writeTo metafile is " + metafile + " of size " + metafile.length());  
  289.     //为该Block的临时数据文件和元数据文件创建一个写入流  
  290.     return createBlockWriteStreams( f , metafile);  
  291.   }  
  292.   
  293.  public boolean detachBlock(Block block, int numLinks) throws IOException {  
  294.     DatanodeBlockInfo info = null;  
  295.   
  296.     synchronized (this) {  
  297.       info = volumeMap.get(block);  
  298.     }  
  299.     return info.detachBlock(block, numLinks);  
  300.   }  
  301. </pre>  
  302. <p></p>  
  303. <p><span style="font-size:18px">7.finalizeBlock(Block)</span></p>  
  304. <p>     <span style="font-size:18px">当DataNode节点成功的接收了一个Block的数据之后,就需要把这个Block的数据文件和元数据文件移动到它真正的位置处。(如果是数据的最开始来源于用户,当用户在传送数据时客户端出现异常,则仍然认为接收的Block的是“成功”的,因为客户端在恢复之后选择append操作仍然可以接着再传)</span><br>  
  305. </p>  
  306. <p><span style="font-size:18px"></span></p>  
  307. <pre name="code" class="java">public synchronized void finalizeBlock(Block b) throws IOException {  
  308.     ActiveFile activeFile = ongoingCreates.get(b);  
  309.     if (activeFile == null) {  
  310.       throw new IOException("Block " + b + " is already finalized.");  
  311.     }  
  312.     File f = activeFile.file;  
  313.     if (f == null || !f.exists()) {  
  314.       throw new IOException("No temporary file " + f + " for block " + b);  
  315.     }  
  316.     FSVolume v = volumeMap.get(b).getVolume();  
  317.     if (v == null) {  
  318.       throw new IOException("No volume for temporary file " + f + " for block " + b);  
  319.     }  
  320.           
  321.     File dest = null;  
  322.     dest = v.addBlock(b, f);//将Block的数据文件和元数据文件移动到分配的“分区”中  
  323.     volumeMap.put(b, new DatanodeBlockInfo(v, dest));  
  324.     ongoingCreates.remove(b);//消除Block正在创建的记录  
  325.   }  
  326. </pre><br>  
  327. <span style="font-size:18px">8.unfinalizeBlock(Block)</span>  
  328. <p></p>  
  329. <p><span style="font-size:18px">   当我增加一个Block副本的时候,NameNode节点会选择一个已存在该Block副本的DataNode节点作为主节点,然后选择若干个其它DataNode节点作为备份节点,如果备份节点在接受这个Block的数据时发生I/O异常,就会删除与这个Block相关的信息<br>  
  330. </span></p>  
  331. <p><span style="font-size:18px"></span></p>  
  332. <pre name="code" class="java">public synchronized void unfinalizeBlock(Block b) throws IOException {  
  333.     // 消除Block正在创建记录  
  334.     ActiveFile activefile = ongoingCreates.remove(b);  
  335.     if (activefile == null) {  
  336.       return;  
  337.     }  
  338.     volumeMap.remove(b);  
  339.       
  340.     //从磁盘上删除该Block的临时数据文件和元数据文件  
  341.     if (delBlockFromDisk(activefile.file, getMetaFile(activefile.file, b), b)) {  
  342.       DataNode.LOG.warn("Block " + b + " unfinalized and removed. " );  
  343.     }  
  344.   }  
  345.   
  346.  private boolean delBlockFromDisk(File blockFile, File metaFile, Block b) {  
  347.     if (blockFile == null) {  
  348.       DataNode.LOG.warn("No file exists for block: " + b);  
  349.       return true;  
  350.     }  
  351.       
  352.     if (!blockFile.delete()) {  
  353.       DataNode.LOG.warn("Not able to delete the block file: " + blockFile);  
  354.       return false;  
  355.     } else { // remove the meta file  
  356.       if (metaFile != null && !metaFile.delete()) {  
  357.         DataNode.LOG.warn( "Not able to delete the meta block file: " + metaFile);  
  358.         return false;  
  359.       }  
  360.     }  
  361.       
  362.     return true;  
  363.   }  
  364. </pre><br>  
  365. <span style="font-size:18px">9.getBlockInputStream(Block,long)</span>  
  366. <p></p>  
  367. <p><span style="font-size:18px">   HDFS支持文件的随机读,当用户想要从某一个文件中读取某个位置以后的若干数据时,HDFS的用户客户端会向<span style="font-size:18px">NameNode节点</span>请求该文件的起始位置位于哪一个数据块并且该数据块存放在那些DataNode节点上,而NameNode同时也会按照客户端与这些DataNode节点的距离进行升序排序。从NameNode返回之后,客户端会依次尝试从这些DataNode节点来获取数据,对应的DataNode也会从指定位置开始传送这些数据。<br>  
  368. </span></p>  
  369. <p><span style="font-size:18px"></span></p>  
  370. <pre name="code" class="java">  public synchronized InputStream getBlockInputStream(Block b, long seekOffset) throws IOException {  
  371.   
  372.     File blockFile = getBlockFile(b);  
  373.     RandomAccessFile blockInFile = new RandomAccessFile(blockFile, "r");  
  374.     if (seekOffset > 0) {  
  375.       blockInFile.seek(seekOffset);  
  376.     }  
  377.       
  378.     return new FileInputStream(blockInFile.getFD());  
  379.   }  
  380. </pre>  
  381. <p></p>  
  382. <p></p>  
  383.      <span style="font-size:18px">关于接口的其它重要的方法,我将在以后的博文中结合实际情况来详细的讨论。</span><br>  
  384. <pre></pre>  
  385.       
  386.         <div style="padding-top:20px">           
  387.             <p style="font-size:12px;">版权声明:本文为博主原创文章,未经博主允许不得转载。</p>  
阅读(1257) | 评论(0) | 转发(0) |
给主人留下些什么吧!~~