Chinaunix首页 | 论坛 | 博客
  • 博客访问: 80847
  • 博文数量: 29
  • 博客积分: 0
  • 博客等级: 民兵
  • 技术积分: 225
  • 用 户 组: 普通用户
  • 注册时间: 2014-03-06 15:31
文章分类

全部博文(29)

文章存档

2015年(18)

2014年(11)

我的朋友

分类: HADOOP

2014-09-03 11:01:51

wordmean是用来统计单词平均长度的程序

点击(此处)折叠或打开

  1. package org.apache.hadoop.examples;
  2. //求单词长度的平均数

  3. /**
  4.  * Licensed to the Apache Software Foundation (ASF) under one
  5.  * or more contributor license agreements. See the NOTICE file
  6.  * distributed with this work for additional information
  7.  * regarding copyright ownership. The ASF licenses this file
  8.  * to you under the Apache License, Version 2.0 (the
  9.  * "License"); you may not use this file except in compliance
  10.  * with the License. You may obtain a copy of the License at
  11.  *
  12.  *
  13.  *
  14.  * Unless required by applicable law or agreed to in writing, software
  15.  * distributed under the License is distributed on an "AS IS" BASIS,
  16.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  17.  * See the License for the specific language governing permissions and
  18.  * limitations under the License.
  19.  */

  20. import java.io.BufferedReader;
  21. import java.io.IOException;
  22. import java.io.InputStreamReader;
  23. import java.util.StringTokenizer;

  24. import org.apache.hadoop.conf.Configuration;
  25. import org.apache.hadoop.conf.Configured;
  26. import org.apache.hadoop.fs.FileSystem;
  27. import org.apache.hadoop.fs.Path;
  28. import org.apache.hadoop.io.LongWritable;
  29. import org.apache.hadoop.io.Text;
  30. import org.apache.hadoop.mapreduce.Job;
  31. import org.apache.hadoop.mapreduce.Mapper;
  32. import org.apache.hadoop.mapreduce.Reducer;
  33. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
  34. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
  35. import org.apache.hadoop.util.Tool;
  36. import org.apache.hadoop.util.ToolRunner;

  37. import com.google.common.base.Charsets;

  38. public class WordMean extends Configured implements Tool {

  39.   private double mean = 0;

  40.   private final static Text COUNT = new Text("count");
  41.   private final static Text LENGTH = new Text("length");
  42.   private final static LongWritable ONE = new LongWritable(1);

  43.   /**
  44.    * Maps words from line of text into 2 key-value pairs; one key-value pair for
  45.    * counting the word, another for counting its length.
  46.    */
  47.   public static class WordMeanMapper extends
  48.       Mapper<Object, Text, Text, LongWritable> {

  49.     private LongWritable wordLen = new LongWritable();

  50.     /**
  51.      * Emits 2 key-value pairs for counting the word and its length. Outputs are
  52.      * (Text, LongWritable).
  53.      *
  54.      * @param value
  55.      * This will be a line of text coming in from our input file.
  56.      */
  57.     public void map(Object key, Text value, Context context)
  58.         throws IOException, InterruptedException {
  59.       StringTokenizer itr = new StringTokenizer(value.toString());
  60.       while (itr.hasMoreTokens()) {
  61.         String string = itr.nextToken();
  62.         this.wordLen.set(string.length());
  63.         context.write(LENGTH, this.wordLen);//单词长度(length,len)
  64.         context.write(COUNT, ONE);//单词次数(count,1)
  65.       }
  66.     }
  67.   }

  68.   /**
  69.    * Performs integer summation of all the values for each key.
  70.    */
  71.   public static class WordMeanReducer extends
  72.       Reducer<Text, LongWritable, Text, LongWritable> {

  73.     private LongWritable sum = new LongWritable();

  74.     /**
  75.      * Sums all the individual values within the iterator and writes them to the
  76.      * same key.
  77.      *
  78.      * @param key
  79.      * This will be one of 2 constants: LENGTH_STR or COUNT_STR.
  80.      * @param values
  81.      * This will be an iterator of all the values associated with that
  82.      * key.
  83.      */
  84.     public void reduce(Text key, Iterable<LongWritable> values, Context context)
  85.         throws IOException, InterruptedException {

  86.       int theSum = 0;
  87.       for (LongWritable val : values) {
  88.         theSum += val.get();
  89.       }
  90.       sum.set(theSum);
  91.       context.write(key, sum);
  92.     }
  93.   }
  94.   //reduce输出两个键值对,一个为(length,总长度),另一个为(count,单词总数量)

  95.   /**
  96.    * Reads the output file and parses the summation of lengths, and the word
  97.    * count, to perform a quick calculation of the mean.
  98.    *
  99.    * @param path
  100.    * The path to find the output file in. Set in main to the output
  101.    * directory.
  102.    * @throws IOException
  103.    * If it cannot access the output directory, we throw an exception.
  104.    */
  105.   private double readAndCalcMean(Path path, Configuration conf)
  106.       throws IOException {
  107.     FileSystem fs = FileSystem.get(conf);//HDFS API
  108.     Path file = new Path(path, "part-r-00000");

  109.     if (!fs.exists(file))
  110.       throw new IOException("Output not found!");

  111.     BufferedReader br = null;

  112.     // average = total sum / number of elements;
  113.     try {
  114.       br = new BufferedReader(new InputStreamReader(fs.open(file), Charsets.UTF_8));

  115.       long count = 0;
  116.       long length = 0;

  117.       String line;
  118.       while ((line = br.readLine()) != null) {
  119.         StringTokenizer st = new StringTokenizer(line);

  120.         // grab type
  121.         String type = st.nextToken();//类型,count或者length

  122.         // differentiate
  123.         if (type.equals(COUNT.toString())) {
  124.           String countLit = st.nextToken();//读取单词总数量
  125.           count = Long.parseLong(countLit);//转为long
  126.         } else if (type.equals(LENGTH.toString())) {
  127.           String lengthLit = st.nextToken();//读取单词总长度
  128.           length = Long.parseLong(lengthLit);//转为long
  129.         }
  130.       }

  131.       double theMean = (((double) length) / ((double) count));
  132.       System.out.println("The mean is: " + theMean);
  133.       return theMean;
  134.     } finally {
  135.       if (br != null) {
  136.         br.close();
  137.       }
  138.     }
  139.   }

  140.   public static void main(String[] args) throws Exception {
  141.     ToolRunner.run(new Configuration(), new WordMean(), args);
  142.   }

  143.   @Override
  144.   public int run(String[] args) throws Exception {
  145.     if (args.length != 2) {
  146.       System.err.println("Usage: wordmean ");
  147.       return 0;
  148.     }

  149.     Configuration conf = getConf();

  150.     @SuppressWarnings("deprecation")
  151.     Job job = new Job(conf, "word mean");
  152.     job.setJarByClass(WordMean.class);
  153.     job.setMapperClass(WordMeanMapper.class);
  154.     job.setCombinerClass(WordMeanReducer.class);
  155.     job.setReducerClass(WordMeanReducer.class);
  156.     job.setOutputKeyClass(Text.class);
  157.     job.setOutputValueClass(LongWritable.class);
  158.     FileInputFormat.addInputPath(job, new Path(args[0]));
  159.     Path outputpath = new Path(args[1]);
  160.     FileOutputFormat.setOutputPath(job, outputpath);
  161.     boolean result = job.waitForCompletion(true);
  162.     mean = readAndCalcMean(outputpath, conf);

  163.     return (result ? 0 : 1);
  164.   }

  165.   /**
  166.    * Only valuable after run() called.
  167.    *
  168.    * @return Returns the mean value.
  169.    */
  170.   public double getMean() {
  171.     return mean;
  172.   }
  173. }


阅读(2910) | 评论(0) | 转发(0) |
给主人留下些什么吧!~~