分箱模式与区模式类似,都是在不考虑记录顺序的情况下,对记录进行分类。与分区不同的是,分箱是在Map阶段对数据进行拆分。这样可以将减少reduce阶段的输出工作量。
但该模式缺点是每个mapper将为每个输出箱子创建文件。假如有1000个箱子和1000个mapper,那么输出文件1000 000个文件。这对NameNode可扩展性及随后的分析不利,而分区每个类型只有1个输出文件。
主要原理采用MultipleOutputs在map阶段根据不同的Key输出到不同的文件中。
实现步骤:
1、在map阶段的 setup函数中新建
MultipleOutputs对象
2、在map阶段的cleanup函数中关闭在setup中创建的对象
3、在map函数中根据不同的Key write到不同的文件中
具体实现如下:
-
import org.apache.hadoop.fs.Path;
-
import org.apache.hadoop.io.LongWritable;
-
import org.apache.hadoop.io.NullWritable;
-
import org.apache.hadoop.io.Text;
-
import org.apache.hadoop.mapreduce.Job;
-
import org.apache.hadoop.mapreduce.Mapper;
-
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
-
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-
import org.apache.hadoop.mapreduce.lib.output.LazyOutputFormat;
-
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
-
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
-
-
import java.io.IOException;
-
-
public class BinningJob {
-
-
public static class BinningMapper extends Mapper<LongWritable , Text,NullWritable, Text>{
-
-
MultipleOutputs<NullWritable, Text> mos = null;
-
@Override
-
protected void setup(Context context){
-
mos= new MultipleOutputs(context);
-
}
-
@Override
-
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
-
String data = value.toString();
-
if(data.startsWith("17")){
-
mos.write("binsseven", value,NullWritable.get());
-
}else if(data.startsWith("16")){
-
mos.write("binssix", value, NullWritable.get(), "16");
-
}
-
}
-
-
@Override
-
public void cleanup(Context context) throws IOException, InterruptedException {
-
-
mos.close();
-
-
}
-
}
-
-
-
public static void main(String []args){
-
-
try {
-
Job job = Job.getInstance();
-
job.setJobName("BinningJob");
-
job.setJarByClass(BinningJob.class);
-
job.setMapperClass(BinningMapper.class);
-
job.setMapOutputKeyClass(NullWritable.class);
-
job.setMapOutputValueClass(Text.class);
-
-
job.setNumReduceTasks(0);
-
-
FileInputFormat.setInputPaths(job,new Path(args[0]));
-
FileOutputFormat.setOutputPath(job,new Path(args[1]));
-
-
//不生成空文件
-
LazyOutputFormat.setOutputFormatClass(job, TextOutputFormat.class);
-
//重新定义输出目录
-
MultipleOutputs.addNamedOutput(job,"binssix", TextOutputFormat.class,Text.class, NullWritable.class);
-
MultipleOutputs.addNamedOutput(job,"binsseven", TextOutputFormat.class,Text.class, NullWritable.class);
-
System.out.println(job.waitForCompletion(true));
-
} catch (IOException e) {
-
e.printStackTrace();
-
} catch (InterruptedException e) {
-
e.printStackTrace();
-
} catch (ClassNotFoundException e) {
-
e.printStackTrace();
-
}
-
-
}
-
}
阅读(11114) | 评论(0) | 转发(0) |