Chinaunix首页 | 论坛 | 博客
  • 博客访问: 179776
  • 博文数量: 36
  • 博客积分: 2510
  • 博客等级: 少校
  • 技术积分: 410
  • 用 户 组: 普通用户
  • 注册时间: 2009-04-04 12:39
文章分类

全部博文(36)

文章存档

2010年(1)

2009年(35)

我的朋友

分类: LINUX

2009-04-17 17:42:02

由于大量的使用interface,reflection,rpc proxy,所以当我们提交job给hadoop的时候,他到底是如何一步步运行的确实不太容易看明白,今天费了将近一天的功夫终于将其大概整理了出俩,为以后继续深入仔细阅读源码打下基础。

start JobTracker and TaskTracker by bin/start-all.sh
JobTracker initialization:
    read all host( master and slave )
    init QueueManager to manage job queue
    start JobQueueTaskScheduler to schedule job
        add JobQueueJobInProcessListener to manage submited jobs
        add EagerTaskInitialization, so when job was submited, JobinitThread will get first job in job queue and execute job ( throught job.initTask)
    start JobTracker Server which is instance of RPC.Server, derived from org.apache.hadoop.mapred.ipc.Server, so client can submit job by creating proxy
    start httpServer (third party library) so we can get status of job tracker through http
    init JobTrackerInstrumentation so record measure information
    start RecoverManager so we can try to recover unfinished history jobs
    init CompletedJobStatusStore to persist and retrive job information
    start ExpireTrackers thread to remove all expiry TaskTracker, whose heatbeat response out of time
    start RetireJobs to remove finished jobs that have been around for too long ( the job status is not runing, prepare ... )
    start ExpireLaunchingTasks to check task assignment failure and reassign the task
    join JobTracker Server to wait for job submit request

TaskTracker initialization:
    start http server to report map/reduce status by http
    start TaskTrackerInstrument to record measure info
    init JvmManager which can find free JVM to run task
    start taskReportServer (ipc.Server) with instance as TaskTracker
    create JobTracker proxy so the taskTracker can communicate with JobTracker
    start mapEventsFetcher to fetch all map event
    start mapLauncher and reduceLauncher ( instance of TaskLauncher) thread to monitor task queue, if new task was inserted, get first and startNewTask.
after TaskTracker initialization, start TaskTracker server which will looply communicate with JobTracker by Hearbbeat test, also get task.
    check buildVersion and file system
    purge map-event and reset reducer
    check if the TaskTracker should be restart
    get TaskAction from Heartbert message, if exist, add action to actionQueue



org.apache.hadoop.mapred.JobClient :
    public static RunningJob runJob(JobConf job)
    
    public RunningJob submitJob(JobConf job)
        copy job jar to DFS
        get input split
        sort input split
        write split info into file system
        jobSubmitClient.submitJob, jobSubmitClient is client proxy of org.apache.hadoop.mapred.JobTracker, which record status of all jobs

org.apache.hadoop.mapred.JobTracker :
    public JobStatus submitJob(JobID jobName)
        create JobInProcess with jobID
            copy job jar from DFS to local
        add job to JobQueueJobInProgressListener and EagerTaskInitializationListener, EagerTaskInitializationListener will call JobInProcess.initTask to create map and reduce task based on InputSplit, and create other assistant task, such as clean-up task ect.
    JobTracker call JobQueueTaskScheduler.assignTask to assign task for each request host
        call JobTracker.getSetupAndCleanupTasks() to get all tasks waited for execution, call TaskInProcess.addRunningTask() to set map or reduce task
        all got tasks were encapsulated in TaskActions
        send TaskActions to TaskTrackers through HeartbeatReponse

    TaskTracker get TaskAction from HeartbeatResponse message ( JobTracker call JobQueueTaskScheduler.assignTask to assign task for each request host)
    TaskTracker.TaskInProcess.registerTask() was called so TaskAction was transformed to Tasktracker.TaskInProcess (tip)
    the newly got tip was insert into task queue ( named tasksToLaunch) of TaskLauncher ( named mapLauncher and reduceLauncher), once new task was inserted:
    TaskTracker.startNewTask was called
    TaskTracker.localizeJob() was called
        add task to job, so MapEventFetcher can get map event (further thinking)
        get job jar file and create necessary working directory
        unjar job jar file
    TaskTracker.launchTaskForJob was called
    TaskTracker.TaskInProcess.launchTask was called
    TaskTracker.TaskInProces.localizeTask() was called
        create local working dir
        create symlink for job dir if it doesn't exist
        set resolved hostname
        set debug parameter if debugCommand exist
    Task.createRunner (MapTask or ReduceTask) was called
    TaskRunner.start()
        build parameter for JVM running, JVM was selected by JvmManager which obey singleInstance model
        entry of JVM is org.apache.hadoop.mapred.Child which will call, which will call Task.run()
    in MapTask.run() or ReduceTask.run(), our mapper or reducer code waw call through
    **************
        MapRunnable runner = ReflectionUtils.newInstance(job
                            .getMapRunnerClass(), job);
    **************
    finally, we finish our map/reduce work.

阅读(3368) | 评论(1) | 转发(0) |
给主人留下些什么吧!~~

raymond19842009-06-10 09:29:18

http://forum.mapbased.com/forum/DispSubject.html?pageIndex=0&id=216&_l=en_US this thread give more detailed explanation