Chinaunix首页 | 论坛 | 博客
  • 博客访问: 80447
  • 博文数量: 29
  • 博客积分: 0
  • 博客等级: 民兵
  • 技术积分: 225
  • 用 户 组: 普通用户
  • 注册时间: 2014-03-06 15:31
文章分类

全部博文(29)

文章存档

2015年(18)

2014年(11)

我的朋友

分类: HADOOP

2014-09-03 11:04:52

转自http://blog.csdn.net/wangxw8746/article/details/9230323

点击(此处)折叠或打开

  1. package org.apache.hadoop.examples;
  2.   
  3. import java.io.IOException;
  4. import java.util.ArrayList;
  5. import java.util.StringTokenizer;
  6. import java.util.Map.Entry;
  7.   
  8. import org.apache.hadoop.io.Text;
  9. import org.apache.hadoop.mapred.JobClient;
  10. import org.apache.hadoop.mapred.JobConf;
  11. import org.apache.hadoop.mapred.lib.aggregate.ValueAggregatorBaseDescriptor;
  12. import org.apache.hadoop.mapred.lib.aggregate.ValueAggregatorJob;
  13.   
  14. /**
  15. 这个是hadoop的map/reduce的例子,是对例子WordCount利用系统已经实现的map/reduce类进行简化。系统已经实现的ValueAggregatorBaseDescriptor 和ValueAggregatorJob已经实现各种数据类型的求和最大值,最小值的算法。类型如下:
  16. UniqValueCount
  17. LongValueSum
  18. DoubleValueSum
  19. ValueHistogram
  20. LongValueMax
  21. LongValueMin
  22. StringValueMax
  23. StringValueMin
  24. 具体请看相关的源代码。
  25. 这个job的执行必须用-jarlibs执行,不然会报configured错误。
  26. 执行命令如下:
  27. hadoop jar hadoop-example.jar -libjars hadoop-example.jar shakepoems.text out_aggregate_his 3 textinputformat
  28.  
  29.  * This is an example Aggregated Hadoop Map/Reduce application. It reads the
  30.  * text input files, breaks each line into words and counts them. The output is
  31.  * a locally sorted list of words and the count of how often they occurred.
  32.  *
  33.  * To run: bin/hadoop jar hadoop-*-examples.jar aggregatewordcount in-dir
  34.  * out-dir numOfReducers textinputformat
  35.  *
  36.  */
  37. public class AggregateWordCount {
  38.   
  39.   /*继承类ValueAggregatorBaseDescriptor */
  40.   public static class WordCountPlugInClass extends
  41.       ValueAggregatorBaseDescriptor {
  42.     @Override
  43.     public ArrayList<Entry<Text, Text>> generateKeyValPairs(Object key,
  44.                                                             Object val) {
  45.       String countType = LONG_VALUE_SUM;//指定算法类型是long类型的求和
  46.       ArrayList<Entry<Text, Text>> retv = new ArrayList<Entry<Text, Text>>();
  47.       String line = val.toString();
  48.       StringTokenizer itr = new StringTokenizer(line);
  49.       while (itr.hasMoreTokens()) {
  50.         Entry<Text, Text> e = generateEntry(countType, itr.nextToken(), ONE);
  51.         if (e != null) {
  52.           retv.add(e);
  53.         }
  54.       }
  55.       return retv;
  56.     }
  57.   }
  58.   
  59.   /**用静态类ValueAggregatorJob执行job
  60.    * The main driver for word count map/reduce program. Invoke this method to
  61.    * submit the map/reduce job.
  62.    *
  63.    * @throws IOException
  64.    * When there is communication problems with the job tracker.
  65.    */
  66.   @SuppressWarnings("unchecked")
  67.   public static void main(String[] args) throws IOException {
  68.     JobConf conf = ValueAggregatorJob.createValueAggregatorJob(args
  69.         , new Class[] {WordCountPlugInClass.class});
  70.      
  71.     JobClient.runJob(conf);
  72.   }
  73.   
  74. }

阅读(1955) | 评论(0) | 转发(0) |
给主人留下些什么吧!~~