Chinaunix首页 | 论坛 | 博客
  • 博客访问: 586410
  • 博文数量: 129
  • 博客积分: 6240
  • 博客等级: 准将
  • 技术积分: 1765
  • 用 户 组: 普通用户
  • 注册时间: 2009-03-18 15:42
文章分类

全部博文(129)

文章存档

2015年(1)

2012年(3)

2011年(6)

2010年(14)

2009年(105)

我的朋友

分类: 系统运维

2009-04-03 15:05:56


1

               

public static class MapClass extends MapReduceBase

    implements Mapper{

   

    private final static IntWritable one = new IntWritable(1);

    private Text word = new Text();

   

    public void map(LongWritable key, Text value,

                    OutputCollector output,

                    Reporter reporter) throws IOException {

      String line = value.toString();

      StringTokenizer itr = new StringTokenizer(line);

      while (itr.hasMoreTokens()) {

        word.set(itr.nextToken());

        output.collect(word, one);

      }

    }

  }

 

2.实现 Reduce

2类实现 Reducer 接口中的 reduce 方法, 入参数中的 key, values 是由 Map 务输出的中间结果,values 是一个 Iterator, 历这 Iterator, 就可以得到属于同一个 key 的所有 value. key 是一个单词value 词频只需要将所有的 value 相加,就可以得到单词的出次数。


2

               

public static class Reduce extends MapReduceBase

    implements Reducer {

   

    public void reduce(Text key, Iterator values,

                       OutputCollector output,

                       Reporter reporter) throws IOException {

      int sum = 0;

      while (values.hasNext()) {

        sum += values.next().get();

      }

      output.collect(key, new IntWritable(sum));

    }

  }

 

3.运行 Job

Hadoop 中一次算任称之一个 job, 可以通一个 JobConf 置如何运行 job。此出的 key 型是 Text, value 型是 IntWritable, 指定使用代1实现 MapClass Mapper , 使用代2实现 Reduce Reducer Combiner , 入路径和出路径由命令行参数指定,这样 job 运行入路径下的所有文件,并将果写到出路径下。

然后将 JobConf 象作参数, JobClient runJob, 算任至于 main 方法中使用的 ToolRunner 是一个运行 MapReduce 助工具,依画葫芦用之即可。


3

               

 public int run(String[] args) throws Exception {

    JobConf conf = new JobConf(getConf(), WordCount.class);

    conf.setJobName("wordcount");

   

    conf.setOutputKeyClass(Text.class);

    conf.setOutputValueClass(IntWritable.class);

   

    conf.setMapperClass(MapClass.class);       

    conf.setCombinerClass(Reduce.class);

    conf.setReducerClass(Reduce.class);

    

    conf.setInputPath(new Path(args[0]));

    conf.setOutputPath(new Path(args[1]));

       

    JobClient.runJob(conf);

    return 0;

  }

 

public static void main(String[] args) throws Exception {

    if(args.length != 2){

      System.err.println("Usage: WordCount ");

      System.exit(-1);

    }

    int res = ToolRunner.run(new Configuration(), new WordCount(), args);

    System.exit(res);

  }

}

 

以上就是 WordCount 程序的全部细节简单人吃惊,您都不敢相信就这么几行代就可以分布式运行于大模集群上,并行理海量数据集。

4. JobConf 定制算任

上文所述的 JobConf 象,程序可以定各参数,定制如何完成一个算任些参数很多情况下就是一个 java 接口,通注入些接口的特定实现,可以定一个算任( job )的全部细节。了解些参数及其缺省置,您才能在写自己的并行算程序做到轻车熟路,游刃有余,明白哪些是需要自己实现的,哪些 Hadoop 的缺省实现即可。表一是 JobConf 象中可以置的一些重要参数的总结明,表中第一列中的参数在 JobConf 中均会有相 get/set 方法,程序,只有在表中第三列中的缺省无法足您的需求,才需要 set 方法,定合适的参数实现自己的算目的。针对表格中第一列中的接口,除了第三列的缺省实现之外,Hadoop 通常会有一些其它的实现,我在表格第四列中列出了部分,您可以查阅 Hadoop API 文档或源代码获得更详细的信息,在很多的情况下,您都不用实现自己的 Mapper Reducer, 直接使用 Hadoop 的一些实现即可。


表一 JobConf 常用可定制参数

参数

作用

缺省

其它实现

InputFormat

入的数据集切割成小数据集 InputSplits, 一个 InputSplit 将由一个 Mapper 负责处理。此外 InputFormat 提供一个 RecordReader 实现, 将一个 InputSplit 解析成 提供 map 函数。

TextInputFormat
(
针对文本文件,按行将文本文件切割成 InputSplits, 并用 LineRecordReader InputSplit 解析成 key 是行在文件中的位置,value 是文件中的一行)

SequenceFileInputFormat

OutputFormat

提供一个 RecordWriter 实现负责输出最终结

TextOutputFormat
(
LineRecordWriter 将最终结果写成文件文件, 一行,key value tab 分隔)

SequenceFileOutputFormat

OutputKeyClass

出的最终结果中 key

LongWritable

OutputValueClass

出的最终结果中 value

Text

MapperClass

Mapper 实现 map 函数,完成入的 到中间结果的映射

IdentityMapper
(
入的 原封不间结)

LongSumReducer,
LogRegexMapper,
InverseMapper

CombinerClass

实现 combine 函数,将中间结果中的重 key 做合并

null
(
间结果中的重 key 做合并)

ReducerClass

Reducer 实现 reduce 函数,间结果做合并,形成最终结

IdentityReducer
(
将中间结果直接终结)

AccumulatingReducer, LongSumReducer

InputPath

job 入目, job 运行入目下的所有文件

null

OutputPath

job 出目job 的最终结果会写入出目

null

MapOutputKeyClass

map 函数出的中间结果中 key

如果用没有定的,使用 OutputKeyClass

MapOutputValueClass

map 函数出的中间结果中 value

如果用没有定的,使用 OutputValuesClass

OutputKeyComparator

对结果中的 key 行排序的使用的比

WritableComparable

PartitionerClass

间结果的 key 排序后,用此 Partition 函数将其划分R,份由一个 Reducer 负责处理。

HashPartitioner
(
使用 Hash 函数做 partition)

KeyFieldBasedPartitioner PipesPartitioner

 

WordCount 程序

在你 Hadoop 并行程序的细节有了比深入的了解,我来把 WordCount 程序改一下,目: (1) WordCount 程序按空格切分单词致各类标点符号与单词在一起,改后的程序应该正确的切出单词,并且单词不要区分大小写。(2)在最终结果中,按单词现频率的降序行排序。

1.修改 Mapper 实现(1)

实现简单4中的注


4

               

public static class MapClass extends MapReduceBase

    implements Mapper {

   

    private final static IntWritable one = new IntWritable(1);

    private Text word = new Text();

    private String pattern="[^\\w]"; //表达式,代表不是0-9, a-z, A-Z的所有其它字符

   

    public void map(LongWritable key, Text value,

                    OutputCollector output,

                    Reporter reporter) throws IOException {

      String line = value.toString().toLowerCase(); //全部转为小写字母

      line = line.replaceAll(pattern, " "); //将非0-9, a-z, A-Z的字符替换为空格

      StringTokenizer itr = new StringTokenizer(line);

      while (itr.hasMoreTokens()) {

        word.set(itr.nextToken());

        output.collect(word, one);

      }

    }

  }

 

2.实现(2)

用 一个并行算任务显然是无法同完成单词词频统计和排序的,这时可以利用 Hadoop 的任管道能力,用上一个任(词频统计)出做下一个任(排序)入,行两个并行算任。主要工作是修改代3中的 run 函数,在其中定一个排序任并运行之。

Hadoop 中要实现排序是很简单的,因 MapReduce 程中,会把中间结果根据 key 排序并按 key 切成 R 份交 R Reduce 函数,而 Reduce 函数在理中间结果之前也会有一个按 key 行排序的程,故 MapReduce 出的最终结实际上已 key 排好序。词频统计务输出的 key 单词value 词频实现词频排序,我指定使用 InverseMapper 排序任 Mapper ( sortJob.setMapperClass(InverseMapper.class );) map 函数简单地将入的 key value 后作间结出,在本例中即是将词频 key,单词 value , 这样自然就能得到按词频排好序的最终结果。我无需指定 Reduce Hadoop 会使用缺省的 IdentityReducer ,将中间结果原样输出。

有一个问题需要解决: 排序任中的 Key 型是 IntWritable, (sortJob.setOutputKeyClass(IntWritable.class)), Hadoop 认对 IntWritable 按升序排序,而我需要的是按降序排列。因此我们实现了一个 IntWritableDecreasingComparator , 并指定使用个自定 Comparator 类对输果中的 key (词频)行排 序:sortJob.setOutputKeyComparatorClass(IntWritableDecreasingComparator.class)

详见 5 及其中的注


5

               

public int run(String[] args) throws Exception {

        Path tempDir = new Path("wordcount-temp-" + Integer.toString(

            new Random().nextInt(Integer.MAX_VALUE))); //一个临时

 

        JobConf conf = new JobConf(getConf(), WordCount.class);

        try {

            conf.setJobName("wordcount");

 

            conf.setOutputKeyClass(Text.class);

            conf.setOutputValueClass(IntWritable.class);

 

            conf.setMapperClass(MapClass.class);

            conf.setCombinerClass(Reduce.class);

            conf.setReducerClass(Reduce.class);

 

            conf.setInputPath(new Path(args[0]));

            conf.setOutputPath(tempDir); //先将词频统计果写到临时

                                         //, 下一个排序任临时录为输入目

           

            conf.setOutputFormat(SequenceFileOutputFormat.class);

           

            JobClient.runJob(conf);

 

            JobConf sortJob = new JobConf(getConf(), WordCount.class);

            sortJob.setJobName("sort");

 

            sortJob.setInputPath(tempDir);

            sortJob.setInputFormat(SequenceFileInputFormat.class);

 

            sortJob.setMapperClass(InverseMapper.class);

 

            sortJob.setNumReduceTasks(1); // Reducer 的个数限定1, 终输出的

                                //文件就是一个。

            sortJob.setOutputPath(new Path(args[1]));

            sortJob.setOutputKeyClass(IntWritable.class);

            sortJob.setOutputValueClass(Text.class);

                 

            sortJob.setOutputKeyComparatorClass(IntWritableDecreasingComparator.class);

 

            JobClient.runJob(sortJob);

        } finally {

            FileSystem.get(conf).delete(tempDir); //临时

        }

    return 0;

  }

 

  private static class IntWritableDecreasingComparator extends IntWritable.Comparator {

      public int compare(WritableComparable a, WritableComparable b) {

        return -super.compare(a, b);

      }

     

      public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {

          return -super.compare(b1, s1, l1, b2, s2, l2);

      }

  }

 

Eclipse 境下开发调试

Eclipse 境下可以方便地 Hadoop 并行程序的开发调试。推荐使用 IBM MapReduce Tools for Eclipse, 使用 Eclipse plugin 可以开发和部署 Hadoop 并行程序的程。基于 plugin, 可以在 Eclipse 建一个 Hadoop MapReduce 用程序,并且提供了一些基于 MapReduce 框架的类开发的向,可以打包成 JAR 文件,部署一个 Hadoop MapReduce 用程序到一个 Hadoop (本地和程均可),可以通一个专门视图 ( perspective ) Hadoop 器、Hadoop 分布式文件系 DFS )和当前运行的任的状

可在 IBM alphaWorks 网站下载这, 或在本文的下中下。将下后的压缩包解到你 Eclipse 安装目,重新启 Eclipse 即可使用了。

Hadoop 主目

Eclipse 主菜 Windows->Preferences, 然后在左侧选择 Hadoop Home Directory,定你的 Hadoop 主目,一所示:


1


立一个 MapReduce Project

Eclipse 主菜 File->New->Project, 出的对话框中选择 MapReduce Project, project name wordcount, 然后点 Finish 即可。, 2 所示:


2


此 后,你就可以象一个普通的 Eclipse Java project ,添加入 Java ,比如你可以定一个 WordCount ,然后将本文代1,2,3中的代写到此中,添加入必要的 import ( Eclipse 快捷 ctrl+shift+o 可以帮你),即可形成一个完整的 wordcount 程序。

在我们这简单 wordcount 程序中,我把全部的内容都放在一个 WordCount 中。实际 IBM MapReduce tools 提供了几个用的向 ( wizard ) 工具,帮你独的 Mapper Reducer MapReduce Driver (就是代3中那部分内容),在写比较复杂 MapReduce 程序,将独立出来是非常有必要的,也有利于在不同的算任中重用你写的各 Mapper Reducer

Eclipse 中运行

三所示,定程序的运行参数:入目出目之后,你就可以在 Eclipse 中运行 wordcount 程序了,当然,你也可以定断点,调试程序。


3

 

到 目前止,我 MapReduce 算模型,分布式文件系 HDFS,分布式并行算等的基本原理, 如何安装和部署 Hadoop 境,实际编写了一个 Hadoop 并行算程序,并了解了一些重要的细节,了解了如何使用 IBM MapReduce Tools Eclipse 境中编译,运行和调试你的 Hadoop 并行算程序。但一个 Hadoop 并行算程序,只有部署运行在分布式集群境中,才能发挥其真正的优势,在篇系列文章的第 3 部分中,你将了解到如何部署你的分布式 Hadoop 境,如何利用 IBM MapReduce Tools 将你的程序部署到分布式境中运行等内容。

 

阅读(1139) | 评论(0) | 转发(0) |
给主人留下些什么吧!~~