Chinaunix首页 | 论坛 | 博客
  • 博客访问: 141182
  • 博文数量: 54
  • 博客积分: 2682
  • 博客等级: 少校
  • 技术积分: 580
  • 用 户 组: 普通用户
  • 注册时间: 2009-10-24 20:56
文章分类
文章存档

2012年(2)

2011年(10)

2010年(28)

2009年(14)

我的朋友

分类: Java

2009-11-27 15:59:31

很久没写了。今天来对hadoop0.1的源码分析加一点东西。
 
其实从hadoop的结构来说,计算部分mapreduce是建立在hdfs(hadoop distributed file system)模块上的,从阅读源码角度来说,肯定是要先阅读hdfs的。但是由于hdfs模块与真正实际的mapreduce算法相差甚远,为了直观起见,还是先讲mapreduce这个算法框架是如何在hadoop中实现的吧。
 
今天我记录的是Map任务是如何在hadoop中的生命周期。
下面是直接复制于我的笔记。
 
-----------------------我的笔记----------------------------------
 
 

MapTask的生命周期(虽然周期跨进程,但是整个周期都处在TaskTracker节点上,不存在处于不同节点的时刻)

---------------------------------Inside TaskTracker---------------------------------

1.TTofferService函数里获取Task

Task t=jobClient.pollForNewTask(taskTrackerName);

这个地方在JobTracker那走了一遭

2.把获取到的Task封装成TIP,并加入管理,TIP里从此多了个Task

TaskInProgress tip = new TaskInProgress(t, this.fconf);

这个地方在TIP的构造函数那走了一遭,把jobfileHDFS那拖下来

加入管理就是TTtasksrunningTasks

3.以后TT都只是用TIPTask进行操作,这个时候用

TaskInProgress.launchTask();启动一个task

 

---------------------------------Inside TaskInProgress---------------------------------

4.创建个TaskRunnerTIP里从此多了一个TaskRunner

this.runner = task.createRunner(TaskTracker.this); TaskTracker

这个地方在MapTaskcreateRunner走了一遭 return new MapTaskRunner

5.MapTaskRunner这个线程开启

也就是进入TaskRunner.start() --> TaskRunner.run()

TaskRunner这个线程的run方法是在TIP里使用的,目的是开启一个子进程,而MapTaskReduceTaskrun方法是在子进程里使用的,目的是做真正的事情

 

-------Inside MapTaskRunner(仍然在是TT开启的线程)----------------------

Run()

Prepare() -->MapTaskRunner.prepare()

 

 

 

Runchild()

拼凑命令行,最关键的命令行片段:

Child TTServer's:port taskID

该子进程伴随着一个输出监视线程,该线程运行在TT空间内

 

----------------------Inside Child(进入子进程,脱离TT-----------------------------

6.从命令行获取TTServer:porttaskid

String taskid

TaskUmbilicalProtocol umbilical = RPC.getProxy()

Task task = umbilical.getTask()

开启ping线程 ping线程和MapTask.run同时运行

准备进入MapTaskrun方法,从此子进程进入真正实质性做事情的阶段

 

----------------------Inside MapTask.run (仍然在子进程中)*****---------------

7.进入MapTask后,依次执行如下操作:

获取reduce个数int partition

构造partition个输出文件

构造partition函数(HashPartition(JobConfigurable))

构造OutputCollector函数(collect() { outfile.append(key,value)})

如果有combine,就构造CombiningCollector

构造RecordReader in(从输入读,从FileSplit)

构造MapRunner(输入,输出,reporter

进入MapRunner.run()

返回后调用TaskRunner.done(Umbilical)

8.进入MapRunner.run()

Read(key,value)

Map(key,value,collect)  <<<------用户的Mapper函数结合

这个地方是最耗时的部分,循环很多次

 

----------------------子进程结束MapTask结束--------------------------------------------

 

 

 

 

-----------------------我的笔记结束----------------------------------

 

其实在讲Map任务的生命周期之前,最好先讲MapReduce的任务调度策略,也就是一个任务是怎么在JobTracker(主控节点)上被调度出来的。留给以后讲吧。

 

另外,Reduce任务的生命周期我在做好笔记后会摘抄到这里。其实Reduce任务的生命周期和MapTask的生命周期有60%是一样的。

阅读(600) | 评论(0) | 转发(0) |
给主人留下些什么吧!~~