转自http://blog.csdn.net/wangxw8746/article/details/9230323
-
package org.apache.hadoop.examples;
-
-
import java.io.IOException;
-
import java.util.ArrayList;
-
import java.util.StringTokenizer;
-
import java.util.Map.Entry;
-
-
import org.apache.hadoop.io.Text;
-
import org.apache.hadoop.mapred.JobClient;
-
import org.apache.hadoop.mapred.JobConf;
-
import org.apache.hadoop.mapred.lib.aggregate.ValueAggregatorBaseDescriptor;
-
import org.apache.hadoop.mapred.lib.aggregate.ValueAggregatorJob;
-
-
/**
-
这个是hadoop的map/reduce的例子,是对例子WordCount利用系统已经实现的map/reduce类进行简化。系统已经实现的ValueAggregatorBaseDescriptor 和ValueAggregatorJob已经实现各种数据类型的求和最大值,最小值的算法。类型如下:
-
UniqValueCount
-
LongValueSum
-
DoubleValueSum
-
ValueHistogram
-
LongValueMax
-
LongValueMin
-
StringValueMax
-
StringValueMin
-
具体请看相关的源代码。
-
这个job的执行必须用-jarlibs执行,不然会报configured错误。
-
执行命令如下:
-
hadoop jar hadoop-example.jar -libjars hadoop-example.jar shakepoems.text out_aggregate_his 3 textinputformat
-
-
* This is an example Aggregated Hadoop Map/Reduce application. It reads the
-
* text input files, breaks each line into words and counts them. The output is
-
* a locally sorted list of words and the count of how often they occurred.
-
*
-
* To run: bin/hadoop jar hadoop-*-examples.jar aggregatewordcount in-dir
-
* out-dir numOfReducers textinputformat
-
*
-
*/
-
public class AggregateWordCount {
-
-
/*继承类ValueAggregatorBaseDescriptor */
-
public static class WordCountPlugInClass extends
-
ValueAggregatorBaseDescriptor {
-
@Override
-
public ArrayList<Entry<Text, Text>> generateKeyValPairs(Object key,
-
Object val) {
-
String countType = LONG_VALUE_SUM;//指定算法类型是long类型的求和
-
ArrayList<Entry<Text, Text>> retv = new ArrayList<Entry<Text, Text>>();
-
String line = val.toString();
-
StringTokenizer itr = new StringTokenizer(line);
-
while (itr.hasMoreTokens()) {
-
Entry<Text, Text> e = generateEntry(countType, itr.nextToken(), ONE);
-
if (e != null) {
-
retv.add(e);
-
}
-
}
-
return retv;
-
}
-
}
-
-
/**用静态类ValueAggregatorJob执行job
-
* The main driver for word count map/reduce program. Invoke this method to
-
* submit the map/reduce job.
-
*
-
* @throws IOException
-
* When there is communication problems with the job tracker.
-
*/
-
@SuppressWarnings("unchecked")
-
public static void main(String[] args) throws IOException {
-
JobConf conf = ValueAggregatorJob.createValueAggregatorJob(args
-
, new Class[] {WordCountPlugInClass.class});
-
-
JobClient.runJob(conf);
-
}
-
-
}
阅读(1955) | 评论(0) | 转发(0) |