Chinaunix首页 | 论坛 | 博客
  • 博客访问: 2566173
  • 博文数量: 709
  • 博客积分: 12251
  • 博客等级: 上将
  • 技术积分: 7905
  • 用 户 组: 普通用户
  • 注册时间: 2005-07-17 00:00
个人简介

实现有价值的IT服务

文章存档

2012年(7)

2011年(147)

2009年(3)

2008年(5)

2007年(74)

2006年(431)

2005年(42)

分类: IT职场

2007-04-17 17:01:09

转:Hadoop Inside - JobConf

启动一个Hadoop任务,一般流程是创建一个JobConf,然后调用JobClient.runJob执行。就从这里入手分析。

runJob是一个静态方法,首先将输入的JobConf构造一个JobClient实例

    /**
     * Build a job client, connect to the default job tracker
     */
    public JobClient(Configuration conf) throws IOException {
      this.conf = conf;
      String tracker = conf.get("mapred.job.tracker", "local");
      if ("local".equals(tracker)) {
        this.jobSubmitClient = new LocalJobRunner(conf);
      } else {
        this.jobSubmitClient = (JobSubmissionProtocol)
          RPC.getProxy(JobSubmissionProtocol.class,
                       JobTracker.getAddress(conf), conf);
      }
    }

 从配置文件读取 mapred.job.tracker,判断是否为本地执行的任务,如果是远程,则使用RPC机制来构造一个 JobSubmissionProtocol接口 的代理。

        running = jc.submitJob(job);

提交任务到MapReduce系统。在submitJob()里面,创建该任务的相关文件(XML,Jar等)。在这里使用到了Hadoop的FileSystem,现在暂时忽略它。(在这里分析的代码中,大量存在着文件操作,出于简化分析的需要,仅仅简单分析意图,忽略细节部分)这部分代码执行之后,可以看到工作目录下面多了一个build目录,里面有job.xml这样的配置文件,里面有关于该任务的所有配置信息。

        JobStatus status = jobSubmitClient.submitJob(submitJobFile.getPath());
        if (status != null) {
            return new NetworkedJob(status);
        } else {
            throw new IOException("Could not launch job");
        }

真正提交任务,jobSubmitClient为 LocalJobRunner 或 RPC的代理类。这里使用LocalJobRunner来分析。

  public JobStatus submitJob(String jobFile) throws IOException {
    return new Job(jobFile, this.conf).status;
  }

 submitJob返回一个内部类Job的实例的状态,该实例继承了Thread和实现了 TaskUmbilicalProtocol接口。Job在构造函数中,首先初始化运行的配置文件,设置当前status为 JobStatus.RUNNING,然后将自身存放到 LocalJobRunner 的HashMap中,最后启动自身线程。

在Job类的run()方法,阐明了MapReduce的流程

        FileSplit[] splits;
        setWorkingDirectory(job, fs);
        splits = job.getInputFormat().getSplits(fs, job, 1);

使用InputFormat,将输入文件分为n个块。

下面是 Map操作 

        // run a map task for each split
        job.setNumReduceTasks(1);                 // force a single reduce task
        for (int i = 0; i < splits.length; i++) {
          mapIds.add("map_" + newId());
          setWorkingDirectory(job, fs);
          MapTask map = new MapTask(file, (String)mapIds.get(i), splits[i]);
          map.setConf(job);
          map_tasks += 1;
          map.run(job, this);
          map_tasks -= 1;
        }

对每块构造一个MapTask,并依次运行。这里首先分析 map.run(job, this);

MapTask的run方法声明是:
public void run(final JobConf job, final TaskUmbilicalProtocol umbilical)

1. 程序根据设置的 NumReduceTasks 值,构造OutputCollector和Reporter,以及CombiningCollector,RecordReader

2. 接下来实例化一个MapRunnable接口的实例,这里默认为MapRunner。MapRunner实例化的同时,根据JobConf.getMapperClass(),实例化Mapper类。

3. 调用MapRunner.run(),根据InputKeyClass和InputValueClass的类型,从RecordReader中读取key -value对, 调用 mapper.map(key, value, output, reporter);  这里就是执行用户自定义的Map过程。

4. 继续执行读取Key-Value和map()方法,直到 RecordReader 的数据读取完,返回到MapTask类。

5. 调用CombiningCollector的实例,对map返回的key进行聚集操作。

6.  最后关闭 SequenceFile.Writer[]。

自此,Map操作结束,接下来是将map产生的输出->reduce的输入

        // move map output to reduce input
        String reduceId = "reduce_" + newId();
        for (int i = 0; i < mapIds.size(); i++) {
          String mapId = (String)mapIds.get(i);
          File mapOut = this.mapoutputFile.getOutputFile(mapId, 0);
          File reduceIn = this.mapoutputFile.getInputFile(mapId, reduceId);
          reduceIn.getParentFile().mkdirs();
          if (!FileSystem.getNamed("local", this.job).rename(mapOut, reduceIn))
            throw new IOException("Couldn't rename " + mapOut);
          this.mapoutputFile.removeAll(mapId);
        }

下面是 Reduce操作

        // run a single reduce task
        String mapDependencies[][] = new String[mapIds.size()][1];
        for (int i = 0; i < mapIds.size(); i++) {
            mapDependencies[i][0] = (String) mapIds.get(i);
        }
        setWorkingDirectory(job, fs);
        ReduceTask reduce = new ReduceTask(file, reduceId,
            mapDependencies,0);
        reduce.setConf(job);
        reduce_tasks += 1;
        reduce.run(job, this);
        reduce_tasks -= 1;
        this.mapoutputFile.removeAll(reduceId);

这里跟Map操作的代码很类似,主要对 ReduceTask 部分进行分析。run()方法:

 public void run(JobConf job, final TaskUmbilicalProtocol umbilical)

1. 获取ouput key-value的 class,以及Reducer实例

    Class keyClass = job.getOutputKeyClass();
    Class valueClass = job.getOutputValueClass();
    Reducer reducer = (Reducer)job.newInstance(job.getReducerClass());
    reducer.configure(job);
    FileSystem lfs = FileSystem.getNamed("local", job);

2. 打开输出文件

    // open a file to collect map output
    String file = job.getLocalFile(getTaskId(), "all.1").toString();
    SequenceFile.Writer writer =
      new SequenceFile.Writer(lfs, file, keyClass, valueClass);

3. 应该是读取输入文件(Map所输出的),并且将所有的输入文件导入一个文件里面。186-225行

4. 开一个监听线程,监听接下来的排序操作。

5. 对Key进行排序,这里的排序根据 JobConf 里面设置的 OutputKeyComparator 来进行。例如在例子里面就用到了 LongWritable.DecreasingComparator.class 来进行Long值的逆序排序。

    WritableComparator comparator = job.getOutputKeyComparator();
   
    try {
      sortProgress.start();

      // sort the input file
      SequenceFile.Sorter sorter =
        new SequenceFile.Sorter(lfs, comparator, valueClass, job);
      sorter.sort(file, sortedFile);              // sort
      lfs.delete(new File(file));                 // remove unsorted

    } finally {
      sortComplete = true;
    }

    sortPhase.complete();                         // sort is complete 

6. 初始化 RecordWriter 和 OutputCollector。

7. 执行Reduce操作

      ValuesIterator values = new ValuesIterator(in, length, comparator,
                                                 umbilical);
      while (values.more()) {
        reducer.reduce(values.getKey(), values, collector, reporter);
        values.nextKey();
      }

8. 发出通知

done(umbilical);

  public void done(TaskUmbilicalProtocol umbilical)
    throws IOException {
    umbilical.progress(getTaskId(),               // send a final status report
                       taskProgress.get(), taskProgress.toString());
    umbilical.done(getTaskId());
  }

在上面的代码中,有很多 reportProgress(umbilical); 这样的代码,TaskUmbilicalProtocol 是起到一个协议通讯的作用,无论是远程或者本地,都可以通过 TaskUmbilicalProtocol 来通知当前进展。

返回到LocalJobRunner.run(),最后

        this.status.runState = JobStatus.SUCCEEDED;

      } catch (Throwable t) {
        this.status.runState = JobStatus.FAILED;

当操作正常结束时,设置Status为结束,若有异常抛出,设为失败。这里返回去看看JobClient的代码

        JobStatus status = jobSubmitClient.submitJob(submitJobFile.getPath());
        if (status != null) {
            return new NetworkedJob(status);
        } else {
            throw new IOException("Could not launch job");
        }

 以及

      RunningJob running = null;
      String lastReport = null;
      try {
        running = jc.submitJob(job);

主线程获得了一个状态的引用,然后一直循环,直至任务在其它线程中执行完毕

        while (!running.isComplete()) {
          try {
            Thread.sleep(1000);
          } catch (InterruptedException e) {}
          running = jc.getJob(jobId);
          String report = null;
          report = " map "+Math.round(running.mapProgress()*100)+"%  reduce " + Math.round(running.reduceProgress()*100)+"%";
          if (!report.equals(lastReport)) {
            LOG.info(report);
            lastReport = report;
          }
        }

流程到这里就结束了,在过了一遍代码后,大致了解了Hadoop的MapReduce执行机制,它由于设计到分布,因此使用了文件的方式对数据,中间结果,结果进行保存,使用统一的协议接口来对任务进行管理。代码中比较让人迷惑的是文件系统操作部分的代码,Hadoop的文件系统可以实现在不同网络上的机器使用一致的方式对文件进行操作。 

阅读(3358) | 评论(0) | 转发(0) |
0

上一篇:Hadoop学习笔记

下一篇:架构师书单

给主人留下些什么吧!~~