分类: 服务器与存储
2011-11-09 18:14:18
本次记录MapReduce与HDFS的结合。不打算细致的分析代码的所有细节,目标是理解MapReduce是怎么run在HDFS上的。
HDFS的Client
现在仍然不明朗的问题是Client怎么与HDFS交互?
一般FS通常的做法是提供read、write系统调用,然后由Client调用。
HDFS本身是Java代码,Client也是Java代码。
如果提供系统调用,那么需要用C实现VFS内核接口,app函数栈是Java->C->Java,usr->kernel->usr有点罗嗦,这种实现也叫做FUSE。
另外剩下的可能就是提供一个lib,由client调用,而不通过VFS层。
?Who is client ?
bin/hadoop fs命令称为FsShell,作用很像linux系统里的Shell。它会不会是HDFS的一个Client?
这个脚本里的注释说:“run a generic filesystem user client”。刚才的推测似乎有点眉目。
elif [ "$COMMAND" = "fs" ] ; then
CLASS=org.apache.hadoop.fs.FsShell
HADOOP_OPTS="$HADOOP_OPTS $HADOOP_CLIENT_OPTS"
shell脚本里说明FsShell实际的java类是org.apache.hadoop.fs.FsShell,可是这个类的实现里没有调用open,close,read,write之类。
按理说POSIX定义了open,read等的标准API,想想C语言的用户程序,HDFS也应该有这些操作才对。
按照这个思路,找到了DFSClient,这里有基本的open,close,read,append操作。
/********************************************************
* DFSClient can connect to a Hadoop Filesystem and
* perform basic file tasks. It uses the ClientProtocol
* to communicate with a NameNode daemon, and connects
* directly to DataNodes to read/write block data.
*
* Hadoop DFS users should obtain an instance of
* DistributedFileSystem, which uses DFSClient to handle
* filesystem tasks.
*
********************************************************/
?FsShell为什么不使用DFSClient呢?
上面的问题暂且放一边。继续查看代码。
通过DistributedFileSystem使用DFSClient。
从注释上看,end-user应用程序应该使用它与HDFS交互。
/****************************************************************
* Implementation of the abstract FileSystem for the DFS system.
* This object is the way end-user code interacts with a Hadoop
* DistributedFileSystem.
*
*****************************************************************/
DistributedFileSystem extends FileSystem,而且open,read等操作都是对DFSClient的封装。
再仔细看看FileSystem类,open,read等操作都是抽象接口。(好像是“策略模式”)
public abstract class FileSystem
/****************************************************************
* An abstract base class for a fairly generic filesystem. It
* may be implemented as a distributed filesystem, or as a "local"
* one that reflects the locally-connected disk. The local version
* exists for small Hadoop instances and for testing.
*
*
* All user code that may potentially use the Hadoop Distributed
* File System should be written to use a FileSystem object. The
* Hadoop DFS is a multi-machine system that appears as a single
* disk. It's useful because of its fault tolerance and potentially
* very large capacity.
*
* The local implementation is {@link LocalFileSystem} and distributed
* implementation is DistributedFileSystem.
*****************************************************************/
注释里说了,user code都应该使用FileSystem。
看到这里应该明白了吧!FileSystem / DistributedFileSystem / DFSClient 越来越底层,越来越偏向具体实现。
再回头看看FsShell的实现,使用的全部都是FileSystem,与前面的代码结合起来了。
也证明了前头“FsShell是HDFS的一个Client的推测”是正确的。
MapReduce与HDFS的结合点
按照前面的分析,如果MapReduce是HDFS的Client话,那么MapReduce中的文件操作应该使用FileSystem类,代码中会有大量FileSystem类。
hadoop/src/mapred/代码中总共获得FileSystem对象的引用450多次,XXX.getFileSystem(conf)使用的次数更多,不再详细探究。
具体看个例子吧,还是WordCount.java,代码很简洁。
./examples/org/apache/hadoop/examples/WordCount.java
Mapper,Reducer仅仅包含处理逻辑,不涉及IO操作。
main中大部分代码是set job,而job启动代码就是最后的job.waitForCompletion(true)。
与FileIO相关的也就两句:
FileInputFormat.addInputPath将path解析,然后设置job的configuration,即mapred.input.dir。
FileOutputFormat.setOutputPath类似,设置了mapred.output.dir。
job.waitForCompletion中理应调用conf获得input,output目录,作为mapred的输入,输出。
该方法的注释说Submit the job to the cluster and wait for it to finish. 这就是咱们要找的start point。
./mapred/org/apache/hadoop/mapreduce/Job.java
核心代码也就两句submit,isSuccessful。
提交Job,然后检查结果。按照经验,isSuccessful过程阻塞等待job结束。
Submit就是关键,Fire,XXX。
/ * Submit the job to the cluster and return immediately. */
代码:
JobTracker看名字是与monitor,debug等相关。state是设置job状态。
剩下的就是jobClient.submitJobInternal(conf)。
jobClient是什么呢?原来是在connect生成的。
根据conf构建一个jobClient对象。Hadoop说:要有JobClient,于是就有了jobClient。
./mapred/org/apache/hadoop/mapred/JobClient.java
JobClient类是实际干活的,Job就是马甲,还是策略模式。当某人玩策略下大棋的时候,干活的另有其人。
* JobClient
provides facilities to submit jobs, track their
* progress, access component-tasks' reports/logs, get the Map-Reduce cluster
* status information etc.
job的提交过程包括:
从这个过程看,前面的都是prepare,第5步是point。另外,第2、3步也是存储相关的。
这里的注释还给出了一个Hadoop中的MapReduce程序框架,与WordCount里的用法一致。
* // Create a new JobConf
* JobConf job = new JobConf(new Configuration(), MyJob.class);
*
* // Specify various job-specific parameters
* job.setJobName("myjob");
*
* job.setInputPath(new Path("in"));
* job.setOutputPath(new Path("out"));
*
* job.setMapperClass(MyJob.MyMapper.class);
* job.setReducerClass(MyJob.MyReducer.class);
*
* // Submit the job, then poll for progress until the job is complete
* JobClient.runJob(job);
还是回到原来的思路,jobClient.submitJobInternal(conf)。从代码上看,个人感觉应该调用jobClient.submitJob(conf)。
不得不说这个方法是个大家伙。一上来就是return,而且return的ugi.doAs方法里参数是一个巨大的匿名内部类,一下还看不清ugi.doAs需要几个参数。
以指定的用户运行授权的动作。gui.doAs且放一边,action。
这个基于PrivilegedExceptionAction匿名内部类仅实现了run方法,包括几行preparation,紧跟一个大的try-catch-finally。
根据try子句内部的注释,将run的主要内容分为5部分:
这5步与前面注释里描述的job的提交过程很像。前面说过,感兴趣的是1, split的创建,2, job的提交。
split的创建
writeSplits返回的是Mapper task数量。
看着代码,回想一下WordCount的执行过程:mapper的数量增加,从0%到100%。
这里的writeNewSplits,writeOldSplits总感觉与这个过程有联系。
先看看writeNewSplits
根据conf的设定,使用反射找到具体的Input类;生成splits,然后排序;创建split files。
剩下的就是,需要明白InputSplit,input.getSplits,JobSplitWriter.createSplitFiles。
InputSplit是个抽象类,表示一个Mapper要处理的数据。InputSplit将数据看做是bytes。
注释中还说,job的RecordReader将把InputSplit转化为record。
InputSplit仅有2个抽象方法,getLength,getLocations。
getLength返回split中bytes的个数,后面用了比较和排序。
getLocations返回split所在节点的node name列表,字符串数组。
input.getSplits将输入在“逻辑上划分”为许多splits,也就是说split是个逻辑概念,不是物理上将数据拆分。
从使用反射的代码以及前面的FileInputFormat推测,InputFormat类会是一个抽象接口。
job的InputFormat在MapReduce中起3个作用:
FileInputFormat的默认行为是按照输入文件目录的总大小划分为InputSplit。
那么通常的做法就是totalsize / nodesnumber,具体还要看文件在HDFS上是怎么分布存储的。
另外,InputSplit划分的大小有一个上限,就是块大小,这里是默认的64MB。也就是说一个Mapper最大的工作量是一个块。
InputSplit大小的下限通过配置文件设定。
很多时候按照byte为单位划分工作量是没有意义的,因为很多App处理的是一个个对象(record)。
因此,应该按照具体record为单位划分工作量。这时候需要前面提到的RecordReader将工作量按record进行划分。
WordCount使用的是FileInputFormat,它也是通常的InputFormat。具体看看getSplits实现。
./mapred/org/apache/hadoop/mapreduce/lib/input/FileInputFormat.java
FileInputFormat.getSplits基本过程如下:
在FileInputFormat中length>0的单一文件总是默认可拆分。
“无内容”的文件不需mapper处理,hosts列表为空。
“有内容可拆分”部分是主要的代码。
splitSize的计算前面提过,Max(minSize,Min(blockSize,maxSize))
而blkLocations一猜就知道是按照文件逻辑地址顺序排列,一会儿探讨。
FileSplit的内容也知道了,就是(文件路径,该split在文件中的起始地址,该split大小,该split所在节点)
SPLIT_SLOP这里是1.1,也是就说如果剩余部分不超过splitsize的10%就不会拆分。
另外,blkLocations的获取需要看看,它的具体存储节点很有意思。前两节实际运行Hadoop时发现:HDFS对大文件块的存储节点选择是动态决策的。
此处我的WordCount运行在HDFS上,它的FileSystem对应的具体类型是DistributedFileSystem,于是找到:
dfs.getBlockLocations(getPathName(file.getPath()), start, len);
回忆前面FileSystem / DistributedFileSystem / DFSClient的关系,具体代码是DFSClient.getBlockLocations。
方法的注释* It returns a set of hostnames for every block within the indicated region.
DFSClient.getBlockLocations输入的参数是path,start,len。具体过程是:
BlockLocation中记录一个blk区域(start,length)和对应3组名称集{names,hosts,racks}。
name = hostname:portNumber
host = hostname (no portnumber,NameNode使用Ip,DataNode可以使用ip和DNS)
rack = new BaseNode(name, networklocation)
这里的networklocation是xml配置里设置的代表机架的逻辑名称,树状结构,有点像文件系统的目录空间。
到这里input.getSplits完成任务,小结一下当文件系统为HDFS时getSplits的过程:
首先,计算合适的splitsize;
然后,查询HDFS的NameNode获得文件的数据分布情况BlockLocations[];
最后,根据splitsize和BlockLocations[]计算splits[]。
split的内容是:文件路径,数据地址范围,所在节点。
JobSplitWriter.createSplitFiles
JobSplitWriter被JobClient用来write splits,包括meta和raw data。
从参数列表看,splits的存储位置是jobSubmitDir。
从MapReduce的角度看,写下的splits仅仅是逻辑上的如何划分工作量的记录,不包含应用数据。
createSplitFiles方法的核心代码有4句,两部分rawdata,metadata:
createFile在HDFS时最终调用了DFSClient.create,具体细节在下次分析HDFS再看。
writeNewSplits将split对象序列化并写入jobsplit file,返回的SplitMetaInfo[] 。
SplitMetaInfo[] 记录了split的locations——是个string[],以及在jobsplit file里的offset,length。
writeJobSplitMetaInfo将split的存储信息都写入jobsplit meta file中,
包括meta file标示(UTF-8编码的META-SPL),split版本(目前是1),SplitMetaInfo[]长度,每个SplitMetaInfo对象。
可以看出,meta file可用于快速定位某个split在文件中的具体位置。
至此,split的创建结束了,本过程小结:
split是关于如何划分MapReduce里mapper的工作量的对象。
回顾一下,MapReduce是将计算迁移到数据所在的节点,不会迁移数据;
可以说split是MapReduce对数据的划分,File是文件系统对数据的划分,block是存储系统对数据的划分。
App Mapper向下看到的是MapReduce的split,MapReduce向下看到的是HDFS的File,HDFS向下看到的是Block。
job的提交
jobSubmitClient.submitJob。什么是jobSubmitClient?
private JobSubmissionProtocol jobSubmitClient;
JobSubmissionProtocol 是一个interface。
/**
* Protocol that a JobClient and the central JobTracker use to communicate. The
* JobClient can use these methods to submit a Job for execution, and learn about
* the current system status.
*/
public JobStatus submitJob(JobID jobName, String jobSubmitDir, Credentials ts) throws IOException;
JobClient和JobTracker通信的接口,这个可能与RPC有关。
那么jobSubmitClient具体的对象是什么?找到了init()方法,核心代码如下:
根据docs/mapred-default.html,找到mapred.job.tracker的解释。
The host and port that the MapReduce job tracker runs at. If "local", then jobs are run in-process as a single map and reduce task.
这次使用hadoop的相关配置是conf/mapred-site.xml
那么就是通过createRPCProxy()获得jobSubmitClient对象。
RPC.getProxy(JobSubmissionProtocol.class,
JobSubmissionProtocol.versionID, addr,
UserGroupInformation.getCurrentUser(), conf,
NetUtils.getSocketFactory(conf, JobSubmissionProtocol.class))
JobSubmissionProtocol.versionID在这里是28。
addr在这里是192.168.15.62:9001。
NetUtils.getSocketFactory() 在这里是javax.net.SocketFactory。
./core/org/apache/hadoop/ipc/RPC.java
/** Construct a client-side proxy object that implements the named protocol,
* talking to a server at the named address. */
通常在RPC里的proxy也叫做stub,客户端和服务端都有,在client处代表server,在server处代表client,典型的中介。
相关的动词常用marshal,unmarshal,通俗的说也就是打包,解包。
回到代码,核心部分:
java.lang.reflect.Proxy; 涉及interposition,参见:
http://java.sun.com/developer/technicalArticles/JavaLP/Interposing/
whatever,jobSubmitClient.submitJob()经过网络调用的是server.submitJob(),而这里的server是JobTracker。
./mapred/org/apache/hadoop/mapred/JobTracker.java
先检查类的声明JobTracker implements …, JobSubmissionProtocol, … { … },与前面的结论一致。
* JobTracker.submitJob() kicks off a new job. 意思是说:正式开始new job,前面的都是为它准备。job本尊驾到。
这是个超过4屏的方法,其主体分3部分,两次加锁同步。
个人感觉第二次加锁的范围有点大,检查的内容可以放到外面。
private synchronized JobStatus addJob(JobID jobId, JobInProgress job)
throws IOException {}
addJob的声明包含了同步,submitJob()的第二次加锁很多余。
* Adds a job to the jobtracker. Make sure that the checks are inplace before
* adding a job. This is the core job submission logic
核心代码两部分:
有3个问题需要整明白:jobs?listener?myInstrumentation?
这次必须看看jobs是什么了:)
// All the known jobs. (jobid->JobInProgress)
Map
Collections.synchronizedMap(new TreeMap
JobInProgressListener 是JobInProgress lifecycle的callback。
可以想象,当一个JobInProgress完成时,一定有一个listener处理。这就叫async。
myInstrumentation是JobTrackerInstrumentation,
Instrumentation就是仪表,想想汽车方向盘那里的转速表,时速表。可以随时获取job的各种统计信息。
MapReduce与本地FS
另外,MapReduce不仅仅使用了HDFS,而且使用了本地文件系统存储部分临时文件。
可以想象,如果将本节点的Map过程的中间结果存储在HDFS,那么存储路径会经过网络,费力不讨好,事倍功半。
${hadoop.tmp.dir}/mapred/
data files. May be a comma-separated list of
directories on different devices in order to spread disk i/o.
Directories that do not exist are ignored.
In practice, this should be the directory where users' home
directories are located (usually /user)
中间存储在本地文件系统,这意味着MapReduce与HDFS的交互集中在Mapper的读取,Reducer的写入两个过程。
这也应证了MapReduce担当数据分析的角色,与大数据的获取和存储没有多少关系。
本次小结
HDFS提供了一个类似POSIX的文件操作集。
MapReduce使用split管理输入输出。
DFSClient对象可作为分析HDFS的基点。