Chinaunix首页 | 论坛 | 博客
  • 博客访问: 853259
  • 博文数量: 581
  • 博客积分: 7803
  • 博客等级: 少将
  • 技术积分: 3653
  • 用 户 组: 普通用户
  • 注册时间: 2007-04-27 08:21
文章分类

全部博文(581)

文章存档

2013年(7)

2012年(414)

2011年(159)

2009年(1)

分类:

2011-11-10 12:57:25

原文地址:Hadoop-0.20学习笔记(5) 作者:tony_ayuan

本次记录MapReduce与HDFS的结合。不打算细致的分析代码的所有细节,目标是理解MapReduce是怎么run在HDFS上的。

 

HDFS的Client

现在仍然不明朗的问题是Client怎么与HDFS交互?

一般FS通常的做法是提供read、write系统调用,然后由Client调用。

HDFS本身是Java代码,Client也是Java代码。

如果提供系统调用,那么需要用C实现VFS内核接口,app函数栈是Java->C->Java,usr->kernel->usr有点罗嗦,这种实现也叫做FUSE。

另外剩下的可能就是提供一个lib,由client调用,而不通过VFS层。

?Who is client ?

bin/hadoop fs命令称为FsShell,作用很像linux系统里的Shell。它会不会是HDFS的一个Client?

这个脚本里的注释说:“run a generic filesystem user client”。刚才的推测似乎有点眉目。

elif [ "$COMMAND" = "fs" ] ; then     
  CLASS=org.apache.hadoop.fs.FsShell
  HADOOP_OPTS="$HADOOP_OPTS $HADOOP_CLIENT_OPTS"

shell脚本里说明FsShell实际的java类是org.apache.hadoop.fs.FsShell,可是这个类的实现里没有调用open,close,read,write之类。

 

按理说POSIX定义了open,read等的标准API,想想C语言的用户程序,HDFS也应该有这些操作才对。

按照这个思路,找到了DFSClient,这里有基本的open,close,read,append操作。

/********************************************************
* DFSClient can connect to a Hadoop Filesystem and
* perform basic file tasks.  It uses the ClientProtocol
* to communicate with a NameNode daemon, and connects
* directly to DataNodes to read/write block data.
*
* Hadoop DFS users should obtain an instance of
* DistributedFileSystem, which uses DFSClient to handle
* filesystem tasks.
*
********************************************************/

?FsShell为什么不使用DFSClient呢?

 

上面的问题暂且放一边。继续查看代码。

通过DistributedFileSystem使用DFSClient。

从注释上看,end-user应用程序应该使用它与HDFS交互。

/****************************************************************
* Implementation of the abstract FileSystem for the DFS system.
* This object is the way end-user code interacts with a Hadoop
* DistributedFileSystem.
*
*****************************************************************/

DistributedFileSystem extends FileSystem,而且open,read等操作都是对DFSClient的封装。

 

再仔细看看FileSystem类,open,read等操作都是抽象接口。(好像是“策略模式”)

public abstract class FileSystem

/****************************************************************
* An abstract base class for a fairly generic filesystem.  It
* may be implemented as a distributed filesystem, or as a "local"
* one that reflects the locally-connected disk.  The local version
* exists for small Hadoop instances and for testing.
*
*
* All user code that may potentially use the Hadoop Distributed
* File System should be written to use a FileSystem object.  The
* Hadoop DFS is a multi-machine system that appears as a single
* disk.  It's useful because of its fault tolerance and potentially
* very large capacity.
*
* The local implementation is {@link LocalFileSystem} and distributed
* implementation is DistributedFileSystem.
*****************************************************************/

注释里说了,user code都应该使用FileSystem。

看到这里应该明白了吧!FileSystem /  DistributedFileSystem / DFSClient 越来越底层,越来越偏向具体实现。

 

再回头看看FsShell的实现,使用的全部都是FileSystem,与前面的代码结合起来了。

也证明了前头“FsShell是HDFS的一个Client的推测”是正确的。

 

MapReduce与HDFS的结合点

按照前面的分析,如果MapReduce是HDFS的Client话,那么MapReduce中的文件操作应该使用FileSystem类,代码中会有大量FileSystem类。

hadoop/src/mapred/代码中总共获得FileSystem对象的引用450多次,XXX.getFileSystem(conf)使用的次数更多,不再详细探究。

具体看个例子吧,还是WordCount.java,代码很简洁。

 

./examples/org/apache/hadoop/examples/WordCount.java

Mapper,Reducer仅仅包含处理逻辑,不涉及IO操作。

main中大部分代码是set job,而job启动代码就是最后的job.waitForCompletion(true)。

与FileIO相关的也就两句:

  • FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
  • FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));

FileInputFormat.addInputPath将path解析,然后设置job的configuration,即mapred.input.dir。

FileOutputFormat.setOutputPath类似,设置了mapred.output.dir。

 

job.waitForCompletion中理应调用conf获得input,output目录,作为mapred的输入,输出。
该方法的注释说Submit the job to the cluster and wait for it to finish. 这就是咱们要找的start point。

./mapred/org/apache/hadoop/mapreduce/Job.java

核心代码也就两句submit,isSuccessful。

  • if (state == JobState.DEFINE) {
      submit();
    }
  • ……
  • return isSuccessful();

提交Job,然后检查结果。按照经验,isSuccessful过程阻塞等待job结束。

 

Submit就是关键,Fire,XXX。

/ * Submit the job to the cluster and return immediately. */

代码:

  • ……
  • // Connect to the JobTracker and submit the job
        connect();
  • info = jobClient.submitJobInternal(conf);
  • state = JobState.RUNNING;

JobTracker看名字是与monitor,debug等相关。state是设置job状态。

剩下的就是jobClient.submitJobInternal(conf)。

 

jobClient是什么呢?原来是在connect生成的。

  • jobClient = new JobClient((JobConf) getConfiguration());

根据conf构建一个jobClient对象。Hadoop说:要有JobClient,于是就有了jobClient。

 

./mapred/org/apache/hadoop/mapred/JobClient.java

JobClient类是实际干活的,Job就是马甲,还是策略模式。当某人玩策略下大棋的时候,干活的另有其人。

* JobClient provides facilities to submit jobs, track their
* progress, access component-tasks' reports/logs, get the Map-Reduce cluster
* status information etc.

job的提交过程包括:

  1. 检查input,output约定。也就是conf里面的设定
  2. 计算InputSplit。这个是真正的输入数据集,这些数据存储在HDFS上。
  3. 按需设置Cache的需求量。
  4. 复制job程序体和配置信息到MapReduce在HDFS上的“系统目录”。
  5. 提交job给JobTracker。

从这个过程看,前面的都是prepare,第5步是point。另外,第2、3步也是存储相关的。

 

这里的注释还给出了一个Hadoop中的MapReduce程序框架,与WordCount里的用法一致。

*     // Create a new JobConf
*     JobConf job = new JobConf(new Configuration(), MyJob.class);
*    
*     // Specify various job-specific parameters    
*     job.setJobName("myjob");
*    
*     job.setInputPath(new Path("in"));
*     job.setOutputPath(new Path("out"));
*    
*     job.setMapperClass(MyJob.MyMapper.class);
*     job.setReducerClass(MyJob.MyReducer.class);

*
*     // Submit the job, then poll for progress until the job is complete
*     JobClient.runJob(job);

 

还是回到原来的思路,jobClient.submitJobInternal(conf)。从代码上看,个人感觉应该调用jobClient.submitJob(conf)。

不得不说这个方法是个大家伙。一上来就是return,而且return的ugi.doAs方法里参数是一个巨大的匿名内部类,一下还看不清ugi.doAs需要几个参数。

  • private UserGroupInformation ugi;
  • ugi.doAs(PrivilegedExceptionAction)
  • * Run the given action as the user

以指定的用户运行授权的动作。gui.doAs且放一边,action。

 

这个基于PrivilegedExceptionAction匿名内部类仅实现了run方法,包括几行preparation,紧跟一个大的try-catch-finally。

根据try子句内部的注释,将run的主要内容分为5部分:

  1. 获得目录的代理令牌,通俗的说就是“获得授权”。
  2. 检查output约定。
  3. 创建job split。writeSplits
  4. 记录queue的ACL,或者说是运行队列的访问权限。
  5. 提交job。jobSubmitClient.submitJob。

这5步与前面注释里描述的job的提交过程很像。前面说过,感兴趣的是1, split的创建,2, job的提交。

 

split的创建

writeSplits返回的是Mapper task数量。

  • if (jConf.getUseNewMapper()) {
      maps = writeNewSplits(job, jobSubmitDir);
    } else {
      maps = writeOldSplits(jConf, jobSubmitDir);
    }
    return maps;

看着代码,回想一下WordCount的执行过程:mapper的数量增加,从0%到100%。

这里的writeNewSplits,writeOldSplits总感觉与这个过程有联系。

 

先看看writeNewSplits

  • Configuration conf = job.getConfiguration();
  • InputFormat input = ReflectionUtils.newInstance(job.getInputFormatClass(), conf);
  • List splits = input.getSplits(job);
  • T[] array = (T[]) splits.toArray(new InputSplit[splits.size()]);
  • // sort the splits into order based on size, so that the biggest go first
  • Arrays.sort(array, new SplitComparator());
  • JobSplitWriter.createSplitFiles(jobSubmitDir, conf, jobSubmitDir.getFileSystem(conf), array);
  • return array.length;

根据conf的设定,使用反射找到具体的Input类;生成splits,然后排序;创建split files。

剩下的就是,需要明白InputSplit,input.getSplits,JobSplitWriter.createSplitFiles。

 

InputSplit是个抽象类,表示一个Mapper要处理的数据。InputSplit将数据看做是bytes。

注释中还说,job的RecordReader将把InputSplit转化为record。

InputSplit仅有2个抽象方法,getLength,getLocations。

getLength返回split中bytes的个数,后面用了比较和排序。

getLocations返回split所在节点的node name列表,字符串数组。

 

input.getSplits将输入在“逻辑上划分”为许多splits,也就是说split是个逻辑概念,不是物理上将数据拆分。

从使用反射的代码以及前面的FileInputFormat推测,InputFormat类会是一个抽象接口。

job的InputFormat在MapReduce中起3个作用:

  1. 指定输入类型,据此可以check输入。
  2. 将输入的文件集划分为logical的Splits,每个split分配给一个mapper。
  3. 提供RecordReader实现。

FileInputFormat的默认行为是按照输入文件目录的总大小划分为InputSplit。

那么通常的做法就是totalsize / nodesnumber,具体还要看文件在HDFS上是怎么分布存储的。

另外,InputSplit划分的大小有一个上限,就是块大小,这里是默认的64MB。也就是说一个Mapper最大的工作量是一个块。

InputSplit大小的下限通过配置文件设定。

很多时候按照byte为单位划分工作量是没有意义的,因为很多App处理的是一个个对象(record)。

因此,应该按照具体record为单位划分工作量。这时候需要前面提到的RecordReader将工作量按record进行划分。

 

WordCount使用的是FileInputFormat,它也是通常的InputFormat。具体看看getSplits实现。

./mapred/org/apache/hadoop/mapreduce/lib/input/FileInputFormat.java

FileInputFormat.getSplits基本过程如下:

  1. 选择合适的split size。
  2. 通过FileSystem引用获得blkLocations数组,这里与具体的HDFS联系起来了。
  3. 根据file.length和isSplitable分为三种情况(有内容可拆分,有内容不可拆分,无内容)分别处理。
  4. 返回splits。

在FileInputFormat中length>0的单一文件总是默认可拆分。

“无内容”的文件不需mapper处理,hosts列表为空。

“有内容可拆分”部分是主要的代码。

  • long blockSize = file.getBlockSize();
    long splitSize = computeSplitSize(blockSize, minSize, maxSize);
    long bytesRemaining = length;
  • while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
      int blkIndex = getBlockIndex(blkLocations, length - bytesRemaining);
      splits.add(new FileSplit(path, length - bytesRemaining, splitSize, blkLocations[blkIndex].getHosts()));
      bytesRemaining -= splitSize;
    }
  • if (bytesRemaining != 0) {
      splits.add(new FileSplit(path, length-bytesRemaining, bytesRemaining, blkLocations[blkLocations.length-1].getHosts()));
    }

splitSize的计算前面提过,Max(minSize,Min(blockSize,maxSize))

而blkLocations一猜就知道是按照文件逻辑地址顺序排列,一会儿探讨。

FileSplit的内容也知道了,就是(文件路径,该split在文件中的起始地址,该split大小,该split所在节点)

SPLIT_SLOP这里是1.1,也是就说如果剩余部分不超过splitsize的10%就不会拆分。

 

另外,blkLocations的获取需要看看,它的具体存储节点很有意思。前两节实际运行Hadoop时发现:HDFS对大文件块的存储节点选择是动态决策的。

此处我的WordCount运行在HDFS上,它的FileSystem对应的具体类型是DistributedFileSystem,于是找到:

  • dfs.getBlockLocations(getPathName(file.getPath()), start, len);

回忆前面FileSystem / DistributedFileSystem / DFSClient的关系,具体代码是DFSClient.getBlockLocations。

方法的注释* It returns a set of hostnames for every block within the indicated region.

DFSClient.getBlockLocations输入的参数是path,start,len。具体过程是:

  • 查询NameNode获得LocatedBlocks对象。查询依据path,start,len。
  • 遍历LocatedBlocks对象中的所有LocatedBlock对象blk,获取对应的DatanodeInfo[]。
    • 遍历DatanodeInfo[],获取names[], hosts[], racks[], 继而设置blkLocations[]
    • blkLocations[idx] = new BlockLocation(names, hosts, racks, blk.getStartOffset(), blk.getBlockSize());
  • 返回blkLocations[]

BlockLocation中记录一个blk区域(start,length)和对应3组名称集{names,hosts,racks}。

name = hostname:portNumber

host = hostname (no portnumber,NameNode使用Ip,DataNode可以使用ip和DNS)

rack = new BaseNode(name, networklocation)

这里的networklocation是xml配置里设置的代表机架的逻辑名称,树状结构,有点像文件系统的目录空间。

 

到这里input.getSplits完成任务,小结一下当文件系统为HDFS时getSplits的过程:

首先,计算合适的splitsize;

然后,查询HDFS的NameNode获得文件的数据分布情况BlockLocations[];

最后,根据splitsize和BlockLocations[]计算splits[]。

split的内容是:文件路径,数据地址范围,所在节点。

 

JobSplitWriter.createSplitFiles

JobSplitWriter被JobClient用来write splits,包括meta和raw data。

从参数列表看,splits的存储位置是jobSubmitDir。

从MapReduce的角度看,写下的splits仅仅是逻辑上的如何划分工作量的记录,不包含应用数据。

createSplitFiles方法的核心代码有4句,两部分rawdata,metadata:

  • FSDataOutputStream out = createFile(fs, JobSubmissionFiles.getJobSplitFile(jobSubmitDir), conf);
  • SplitMetaInfo[] info = writeNewSplits(conf, splits, out);
  • out.close();
  • writeJobSplitMetaInfo(fs,JobSubmissionFiles.getJobSplitMetaFile(jobSubmitDir),
        new FsPermission(JobSubmissionFiles.JOB_FILE_PERMISSION), splitVersion, info);

createFile在HDFS时最终调用了DFSClient.create,具体细节在下次分析HDFS再看。

writeNewSplits将split对象序列化并写入jobsplit file,返回的SplitMetaInfo[] 。

SplitMetaInfo[] 记录了split的locations——是个string[],以及在jobsplit file里的offset,length。

writeJobSplitMetaInfo将split的存储信息都写入jobsplit meta file中,

包括meta file标示(UTF-8编码的META-SPL),split版本(目前是1),SplitMetaInfo[]长度,每个SplitMetaInfo对象。

可以看出,meta file可用于快速定位某个split在文件中的具体位置。

 

至此,split的创建结束了,本过程小结:

split是关于如何划分MapReduce里mapper的工作量的对象。

回顾一下,MapReduce是将计算迁移到数据所在的节点,不会迁移数据;

可以说split是MapReduce对数据的划分,File是文件系统对数据的划分,block是存储系统对数据的划分。

App Mapper向下看到的是MapReduce的split,MapReduce向下看到的是HDFS的File,HDFS向下看到的是Block。

 

job的提交

jobSubmitClient.submitJob。什么是jobSubmitClient?

private JobSubmissionProtocol jobSubmitClient;

JobSubmissionProtocol 是一个interface。

/**
* Protocol that a JobClient and the central JobTracker use to communicate.  The
* JobClient can use these methods to submit a Job for execution, and learn about
* the current system status.
*/

public JobStatus submitJob(JobID jobName, String jobSubmitDir, Credentials ts)  throws IOException;

JobClient和JobTracker通信的接口,这个可能与RPC有关。

那么jobSubmitClient具体的对象是什么?找到了init()方法,核心代码如下:

  • String tracker = conf.get("mapred.job.tracker", "local");
  • if ("local".equals(tracker)) {
      conf.setNumMapTasks(1);
      this.jobSubmitClient = new LocalJobRunner(conf);
    } else {
      this.jobSubmitClient = createRPCProxy(JobTracker.getAddress(conf), conf);
    }

根据docs/mapred-default.html,找到mapred.job.tracker的解释。

The host and port that the MapReduce job tracker runs at. If "local", then jobs are run in-process as a single map and reduce task.

这次使用hadoop的相关配置是conf/mapred-site.xml


    mapred.job.tracker
    192.168.15.62:9001

那么就是通过createRPCProxy()获得jobSubmitClient对象。

RPC.getProxy(JobSubmissionProtocol.class,
       JobSubmissionProtocol.versionID, addr,
       UserGroupInformation.getCurrentUser(), conf,
       NetUtils.getSocketFactory(conf, JobSubmissionProtocol.class))

JobSubmissionProtocol.versionID在这里是28。

addr在这里是192.168.15.62:9001。

NetUtils.getSocketFactory() 在这里是javax.net.SocketFactory。

 

./core/org/apache/hadoop/ipc/RPC.java

/** Construct a client-side proxy object that implements the named protocol,
* talking to a server at the named address. */

通常在RPC里的proxy也叫做stub,客户端和服务端都有,在client处代表server,在server处代表client,典型的中介。

相关的动词常用marshal,unmarshal,通俗的说也就是打包,解包。

回到代码,核心部分:

  • VersionedProtocol proxy =
        (VersionedProtocol) Proxy.newProxyInstance(
            protocol.getClassLoader(), new Class[] { protocol },
            new Invoker(protocol, addr, ticket, conf, factory));
  • return proxy;

java.lang.reflect.Proxy; 涉及interposition,参见:

http://java.sun.com/developer/technicalArticles/JavaLP/Interposing/

whatever,jobSubmitClient.submitJob()经过网络调用的是server.submitJob(),而这里的server是JobTracker。

 

./mapred/org/apache/hadoop/mapred/JobTracker.java

先检查类的声明JobTracker implements …, JobSubmissionProtocol, … { … },与前面的结论一致。

* JobTracker.submitJob() kicks off a new job. 意思是说:正式开始new job,前面的都是为它准备。job本尊驾到。

这是个超过4屏的方法,其主体分3部分,两次加锁同步。

  • 第一次加锁同步,根据jobID查看job是否已经运行。如果是则返回JobStatus,否则创建JobInfo对象。
  • 创建JobInProgress。job = new JobInProgress(this, this.conf, jobInfo, 0, ts);
  • 第二次加锁同步,前面是一堆检查,然后是核心status = addJob(jobId, job)。检查的内容包括queue,ACL,MEM。

个人感觉第二次加锁的范围有点大,检查的内容可以放到外面。

 

private synchronized JobStatus addJob(JobID jobId, JobInProgress job)
  throws IOException {}

addJob的声明包含了同步,submitJob()的第二次加锁很多余。

* Adds a job to the jobtracker. Make sure that the checks are inplace before
* adding a job. This is the core job submission logic

核心代码两部分:

  • synchronized (jobs) {
      synchronized (taskScheduler) {
        jobs.put(job.getProfile().getJobID(), job);
        for (JobInProgressListener listener : jobInProgressListeners) {
          listener.jobAdded(job);
        }
      }
    }
  • myInstrumentation.submitJob(job.getJobConf(), jobId);

有3个问题需要整明白:jobs?listener?myInstrumentation?

这次必须看看jobs是什么了:)

// All the known jobs.  (jobid->JobInProgress)
Map jobs = 
  Collections.synchronizedMap(new TreeMap());

JobInProgressListener 是JobInProgress lifecycle的callback。

可以想象,当一个JobInProgress完成时,一定有一个listener处理。这就叫async。

myInstrumentation是JobTrackerInstrumentation,

Instrumentation就是仪表,想想汽车方向盘那里的转速表,时速表。可以随时获取job的各种统计信息。

 

MapReduce与本地FS

另外,MapReduce不仅仅使用了HDFS,而且使用了本地文件系统存储部分临时文件。

可以想象,如果将本节点的Map过程的中间结果存储在HDFS,那么存储路径会经过网络,费力不讨好,事倍功半。

${hadoop.tmp.dir}/mapred/


  mapred.local.dir
  ${hadoop.tmp.dir}/mapred/local
  The local directory where MapReduce stores intermediate
  data files.  May be a comma-separated list of
  directories on different devices in order to spread disk i/o.
  Directories that do not exist are ignored.
 


  mapred.system.dir
  ${hadoop.tmp.dir}/mapred/system
  The directory where MapReduce stores control files.
 


  mapreduce.jobtracker.staging.root.dir
  ${hadoop.tmp.dir}/mapred/staging
  The root of the staging area for users' job files
  In practice, this should be the directory where users' home
  directories are located (usually /user)
 


  mapred.temp.dir
  ${hadoop.tmp.dir}/mapred/temp
  A shared directory for temporary files.
 

中间存储在本地文件系统,这意味着MapReduce与HDFS的交互集中在Mapper的读取,Reducer的写入两个过程。

这也应证了MapReduce担当数据分析的角色,与大数据的获取和存储没有多少关系。

 

本次小结

HDFS提供了一个类似POSIX的文件操作集。

MapReduce使用split管理输入输出。

DFSClient对象可作为分析HDFS的基点。

阅读(844) | 评论(0) | 转发(0) |
给主人留下些什么吧!~~