Chinaunix首页 | 论坛 | 博客
  • 博客访问: 1108872
  • 博文数量: 143
  • 博客积分: 969
  • 博客等级: 准尉
  • 技术积分: 1765
  • 用 户 组: 普通用户
  • 注册时间: 2011-07-30 12:09
文章分类

全部博文(143)

文章存档

2023年(4)

2021年(2)

2020年(4)

2019年(4)

2018年(33)

2017年(6)

2016年(13)

2014年(7)

2013年(23)

2012年(33)

2011年(14)

我的朋友

分类: 云计算

2018-06-30 23:21:14

    分箱模式与区模式类似,都是在不考虑记录顺序的情况下,对记录进行分类。与分区不同的是,分箱是在Map阶段对数据进行拆分。这样可以将减少reduce阶段的输出工作量。
但该模式缺点是每个mapper将为每个输出箱子创建文件。假如有1000个箱子和1000个mapper,那么输出文件1000 000个文件。这对NameNode可扩展性及随后的分析不利,而分区每个类型只有1个输出文件。
     主要原理采用MultipleOutputs在map阶段根据不同的Key输出到不同的文件中。

     实现步骤:
    1、在map阶段的 setup函数中新建MultipleOutputs对象
    2、在map阶段的cleanup函数中关闭在setup中创建的对象
    3、在map函数中根据不同的Key write到不同的文件中
    
    具体实现如下:
 

点击(此处)折叠或打开

  1. import org.apache.hadoop.fs.Path;
  2. import org.apache.hadoop.io.LongWritable;
  3. import org.apache.hadoop.io.NullWritable;
  4. import org.apache.hadoop.io.Text;
  5. import org.apache.hadoop.mapreduce.Job;
  6. import org.apache.hadoop.mapreduce.Mapper;
  7. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
  8. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
  9. import org.apache.hadoop.mapreduce.lib.output.LazyOutputFormat;
  10. import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
  11. import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

  12. import java.io.IOException;

  13. public class BinningJob {

  14.     public static class BinningMapper extends Mapper<LongWritable , Text,NullWritable, Text>{

  15.         MultipleOutputs<NullWritable, Text> mos = null;
  16.         @Override
  17.         protected void setup(Context context){
  18.             mos= new MultipleOutputs(context);
  19.         }
  20.         @Override
  21.         protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
  22.             String data = value.toString();
  23.             if(data.startsWith("17")){
  24.                 mos.write("binsseven", value,NullWritable.get());
  25.             }else if(data.startsWith("16")){
  26.                 mos.write("binssix", value, NullWritable.get(), "16");
  27.             }
  28.         }

  29.         @Override
  30.         public void cleanup(Context context) throws IOException, InterruptedException {

  31.             mos.close();

  32.         }
  33.     }


  34.     public static void main(String []args){

  35.         try {
  36.             Job job = Job.getInstance();
  37.             job.setJobName("BinningJob");
  38.             job.setJarByClass(BinningJob.class);
  39.             job.setMapperClass(BinningMapper.class);
  40.             job.setMapOutputKeyClass(NullWritable.class);
  41.             job.setMapOutputValueClass(Text.class);

  42.             job.setNumReduceTasks(0);

  43.             FileInputFormat.setInputPaths(job,new Path(args[0]));
  44.             FileOutputFormat.setOutputPath(job,new Path(args[1]));

  45.             //不生成空文件
  46.             LazyOutputFormat.setOutputFormatClass(job, TextOutputFormat.class);
  47.             //重新定义输出目录
  48.             MultipleOutputs.addNamedOutput(job,"binssix", TextOutputFormat.class,Text.class, NullWritable.class);
  49.             MultipleOutputs.addNamedOutput(job,"binsseven", TextOutputFormat.class,Text.class, NullWritable.class);
  50.             System.out.println(job.waitForCompletion(true));
  51.         } catch (IOException e) {
  52.             e.printStackTrace();
  53.         } catch (InterruptedException e) {
  54.             e.printStackTrace();
  55.         } catch (ClassNotFoundException e) {
  56.             e.printStackTrace();
  57.         }

  58.     }
  59. }



阅读(11114) | 评论(0) | 转发(0) |
给主人留下些什么吧!~~