数据来源:SogouQ
统计信息:对每个查询中的查询词的数目进行统计
代码如下:
package Sogou;
import java.io.IOException;
import java.util.*;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapred.*;
import org.apache.hadoop.util.*;
public class SogouQueryWordCountClassifyMapper extends MapReduceBase implements
Mapper<LongWritable, Text, Text, IntWritable> {
public void map(LongWritable key, Text values,
OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException {
IntWritable one = new IntWritable(1);
String line = values.toString();
/* c#a表示统计查询的总行数 */
String outline = "ca::";
output.collect(new Text(outline), one);
/* 按查询词的个数进行分类 */
String[] words = line.split("\\+");
int length = words.length;
if (0 == length) {
outline = "c0::"; /* 0个查询词 */
}
else if (1 == length) {
outline = "c1::"; /* 1个查询词 */
}
else if (2 == length) {
outline = "c2::"; /* 2个查询词 */
}
else if (3 == length) {
outline = "c3::"; /* 3个查询词 */
}
else {
outline = "c4::"; /* 4个及以上查询词 */
}
output.collect(new Text(outline), one); /* map输出,用于reduce计数 */
}
}
|
package Sogou;
import java.io.IOException;
import java.util.*;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.mapred.*;
import org.apache.hadoop.util.*;
public class SogouQueryWordCountClassifyReducer extends MapReduceBase implements
Reducer<Text, IntWritable, Text, IntWritable> {
public void reduce(Text key, Iterator<IntWritable> values,
OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException {
int sum = 0;
while (values.hasNext()) {
// process value
sum += values.next().get();
}
output.collect(key, new IntWritable(sum));
}
}
|
package Sogou;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapred.*;
public class PartitionerClass implements Partitioner<Text, IntWritable> {
public int getPartition(Text key, IntWritable values, int numPartitions) {
/* 根据map的输出来区别不同的统计信息 */
if (numPartitions >= 6) {
if (key.toString().startsWith("ca::")) {
return 1;
}
else if (key.toString().startsWith("c0::")) {
return 2;
}
else if (key.toString().startsWith("c1::")) {
return 3;
}
else if (key.toString().startsWith("c2::")) {
return 4;
}
else if (key.toString().startsWith("c3::")) {
return 5;
}
else if (key.toString().startsWith("c4::")) {
return 6;
}
else {
return 7;
}
}
else {
return 0;
}
}
public void configure(JobConf job) {}
}
|
package Sogou;
import java.io.IOException;
import java.util.*;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.mapred.*;
import org.apache.hadoop.util.*;
public class SogouQueryWordCountClassify {
public static void main(String[] args) {
JobClient client = new JobClient();
JobConf conf = new JobConf(Sogou.SogouQueryWordCountClassify.class);
conf.setJobName("SogouQueryWordCountClassify");
// specify output types
conf.setOutputKeyClass(Text.class);
conf.setOutputValueClass(IntWritable.class);
// specify input and output DIRECTORIES (not files)
FileInputFormat.setInputPaths(conf, new Path(args[0]));
FileOutputFormat.setOutputPath(conf, new Path(args[1]));
// specify a mapper
conf.setMapperClass(Sogou.SogouQueryWordCountClassifyMapper.class);
// specify a reducer
conf.setReducerClass(Sogou.SogouQueryWordCountClassifyReducer.class);
// specify a partitioner
conf.setPartitionerClass(Sogou.PartitionerClass.class);
client.setConf(conf);
try {
JobClient.runJob(conf);
} catch (Exception e) {
e.printStackTrace();
}
}
}
|
统计结果
c1:: 19266013 //1个查询词的查询次数
c2:: 1621804 //2个查询词的查询次数
c3:: 364414
c4:: 174710
ca:: 21426941 //总的查询次数
阅读(3022) | 评论(0) | 转发(0) |