Chinaunix首页 | 论坛 | 博客
  • 博客访问: 2475666
  • 博文数量: 392
  • 博客积分: 7040
  • 博客等级: 少将
  • 技术积分: 4138
  • 用 户 组: 普通用户
  • 注册时间: 2009-06-17 13:03
个人简介

范德萨发而为

文章分类

全部博文(392)

文章存档

2017年(5)

2016年(19)

2015年(34)

2014年(14)

2013年(47)

2012年(40)

2011年(51)

2010年(137)

2009年(45)

分类: 服务器与存储

2010-03-08 19:39:50

数据来源:SogouQ
统计信息:对每个查询中的查询词的数目进行统计

代码如下:

package Sogou;

import java.io.IOException;
import java.util.*;

import org.apache.hadoop.fs.*;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapred.*;
import org.apache.hadoop.util.*;

public class SogouQueryWordCountClassifyMapper extends MapReduceBase implements
        Mapper<LongWritable, Text, Text, IntWritable> {

    public void map(LongWritable key, Text values,
            OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException {
        IntWritable one = new IntWritable(1);
        String line = values.toString();
        /* c#a表示统计查询的总行数 */
        String outline = "ca::";
        output.collect(new Text(outline), one);

        /* 按查询词的个数进行分类 */
        String[] words = line.split("\\+");
        int length = words.length;
        
        if (0 == length) {
            outline = "c0::"; /* 0个查询词 */
        }
        else if (1 == length) {
            outline = "c1::"; /* 1个查询词 */
        }
        else if (2 == length) {
            outline = "c2::"; /* 2个查询词 */
        }
        else if (3 == length) {
            outline = "c3::"; /* 3个查询词 */
        }
        else {
            outline = "c4::"; /* 4个及以上查询词 */
        }
        
        output.collect(new Text(outline), one); /* map输出,用于reduce计数 */
    }
}


package Sogou;

import java.io.IOException;
import java.util.*;

import org.apache.hadoop.fs.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.mapred.*;
import org.apache.hadoop.util.*;

public class SogouQueryWordCountClassifyReducer extends MapReduceBase implements
        Reducer<Text, IntWritable, Text, IntWritable> {

    public void reduce(Text key, Iterator<IntWritable> values,
            OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException {

        int sum = 0;
        while (values.hasNext()) {
            // process value

            sum += values.next().get();
        }
        
        output.collect(key, new IntWritable(sum));
    }

}


package Sogou;

import org.apache.hadoop.io.*;
import org.apache.hadoop.mapred.*;

public class PartitionerClass implements Partitioner<Text, IntWritable> {
    
    public int getPartition(Text key, IntWritable values, int numPartitions) {
        /* 根据map的输出来区别不同的统计信息 */
        if (numPartitions >= 6) {
            if (key.toString().startsWith("ca::")) {
                return 1;
            }
            else if (key.toString().startsWith("c0::")) {
                return 2;
            }
            else if (key.toString().startsWith("c1::")) {
                return 3;
            }
            else if (key.toString().startsWith("c2::")) {
                return 4;
            }
            else if (key.toString().startsWith("c3::")) {
                return 5;
            }
            else if (key.toString().startsWith("c4::")) {
                return 6;
            }
            else {
                return 7;
            }
        }
        else {
            return 0;
        }
    }
    
    public void configure(JobConf job) {}
}


package Sogou;

import java.io.IOException;
import java.util.*;

import org.apache.hadoop.fs.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.mapred.*;
import org.apache.hadoop.util.*;

public class SogouQueryWordCountClassify {

    public static void main(String[] args) {
        JobClient client = new JobClient();
        JobConf conf = new JobConf(Sogou.SogouQueryWordCountClassify.class);
        conf.setJobName("SogouQueryWordCountClassify");

        // specify output types

        conf.setOutputKeyClass(Text.class);
        conf.setOutputValueClass(IntWritable.class);

        // specify input and output DIRECTORIES (not files)

        FileInputFormat.setInputPaths(conf, new Path(args[0]));
        FileOutputFormat.setOutputPath(conf, new Path(args[1]));

        // specify a mapper

        conf.setMapperClass(Sogou.SogouQueryWordCountClassifyMapper.class);

        // specify a reducer

        conf.setReducerClass(Sogou.SogouQueryWordCountClassifyReducer.class);
        
        // specify a partitioner

        conf.setPartitionerClass(Sogou.PartitionerClass.class);

        client.setConf(conf);
        try {
            JobClient.runJob(conf);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

}


统计结果
c1::    19266013   //1个查询词的查询次数
c2::    1621804    //2个查询词的查询次数
c3::    364414
c4::    174710
ca::    21426941   //总的查询次数

阅读(3033) | 评论(0) | 转发(0) |
给主人留下些什么吧!~~