Chinaunix首页 | 论坛 | 博客
  • 博客访问: 1115143
  • 博文数量: 143
  • 博客积分: 969
  • 博客等级: 准尉
  • 技术积分: 1765
  • 用 户 组: 普通用户
  • 注册时间: 2011-07-30 12:09
文章分类

全部博文(143)

文章存档

2023年(4)

2021年(2)

2020年(4)

2019年(4)

2018年(33)

2017年(6)

2016年(13)

2014年(7)

2013年(23)

2012年(33)

2011年(14)

我的朋友

分类: 云计算

2018-06-30 07:34:50

  MapReduce 分层模式

       分层结构模式 是从数据中创造出不同于原始结构的新记录,其目的主要是将基于行的数据转换成分层的格式,如JSon或Xml等其他需要格式
      主要原理是采用MultipleInputs将多个输入文件,通过不同的Map类,加入到Mapper阶段,在map类中通过打标签的方式来标识数据来源,在reduce阶段中根据map中打的标识来进行数据处理。

实现步骤:
1、自定义组合Key,包含主键和flag
2、根据输入数据来源定义不同的Map类,在Map中输出时需要通过自定义的组合键的flag标识数据来源,并且输出组合键
3、在reduce阶段,根据组合键进行不同需求处理输出
4、在驱动程序中,使用MultipleInputs进行输入

具体实现如下:

点击(此处)折叠或打开

  1. public class MultiInputJoinKey implements WritableComparable<MultiInputJoinKey>{
  2.     private Text key = new Text();

  3.     public IntWritable getType() {
  4.         return type;
  5.     }

  6.     public void setType(IntWritable type) {
  7.         this.type = type;
  8.     }

  9.     public Text getKey() {
  10.         return key;
  11.     }

  12.     public void setKey(Text key) {
  13.         this.key = key;
  14.     }

  15.     private IntWritable type = new IntWritable();
  16.     public MultiInputJoinKey(){

  17.     }

  18.     public MultiInputJoinKey(Text key, IntWritable type){
  19.         this.key = key;
  20.         this.type = type;
  21.     }

  22.     @Override
  23.     public int compareTo(MultiInputJoinKey o) {
  24.          int ret = this.key.compareTo(o.key);
  25.         return ret;
  26.     }

  27.     @Override
  28.     public void write(DataOutput out) throws IOException {
  29.         this.key.write(out);
  30.         this.type.write(out);

  31.     }

  32.     @Override
  33.     public void readFields(DataInput in) throws IOException {
  34.         this.key.readFields(in);
  35.         this.type.readFields(in);

  36.     }


点击(此处)折叠或打开

  1. public class MultiInputJoinStatJob {
  2.     public static class MultiInputNameMapper extends Mapper<LongWritable, Text, MultiInputJoinKey, Text> {

  3.         @Override
  4.         protected void map(LongWritable key, Text value,
  5.                            Context context) throws IOException, InterruptedException {

  6.             String []arrays = value.toString().split("\t");
  7.             System.out.println("MultiInputNameMapper data:"+value.toString());
  8.             MultiInputJoinKey multiInputJoinKey = new MultiInputJoinKey(new Text(arrays[1]), new IntWritable(1));
  9.             context.write(multiInputJoinKey, value);
  10.         }
  11.     }

  12.     public static class MultiInputPriceMapper extends Mapper<LongWritable, Text, MultiInputJoinKey, Text> {

  13.         @Override
  14.         protected void map(LongWritable key, Text value,
  15.                            Context context) throws IOException, InterruptedException {

  16.             String []arrays = value.toString().split("\t");
  17.             System.out.println("MultiInputPriceMapper data:"+value.toString());
  18.             MultiInputJoinKey multiInputJoinKey = new MultiInputJoinKey(new Text(arrays[0]), new IntWritable(2));
  19.             context.write(multiInputJoinKey, value);
  20.         }
  21.     }

  22.     public static class MultiInputReduce extends Reducer<MultiInputJoinKey, Text, NullWritable, Text>{

  23.         @Override
  24.         protected void reduce(MultiInputJoinKey key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
  25.             int count=0;
  26.             String name="";
  27.             String price="";
  28.            for (Text text :values){
  29.                String []arrays = text.toString().split("\t");
  30.                if(key.getType().get() ==1){//name mapper
  31.                    name=arrays[1];
  32.                }else {//price mapper
  33.                    price= arrays[2];
  34.                }
  35.                count++;
  36.            }
  37.             if(name.length()>0){
  38.                 String data = name+" "+price +" "+count;
  39.                 System.out.println("MultiInputReduce data:"+ data);
  40.                 context.write(NullWritable.get(),new Text(data));
  41.             }
  42.         }
  43.     }

  44.     public static void main(String []args){

  45.         try {
  46.             Job job = Job.getInstance();
  47.             job.setJobName("MultiInputJoinStatJob");
  48.             job.setJarByClass(MultiInputJoinStatJob.class);
  49.             String []arrays = new GenericOptionsParser(args).getRemainingArgs();

  50.             MultipleInputs.addInputPath(job, new Path(arrays[0]), TextInputFormat.class, MultiInputNameMapper.class);
  51.             MultipleInputs.addInputPath(job,new Path(arrays[1]), TextInputFormat.class, MultiInputPriceMapper.class);
  52.             FileOutputFormat.setOutputPath(job, new Path(arrays[2]));

  53.             FileSystem fs = FileSystem.get(new URI(arrays[2]),job.getConfiguration());
  54.             if(fs.exists(new Path(arrays[2]))){
  55.                 fs.delete(new Path(arrays[2]),true);
  56.             }
  57.             job.setMapOutputKeyClass(MultiInputJoinKey.class);
  58.             job.setMapOutputValueClass(Text.class);
  59.             job.setReducerClass(MultiInputReduce.class);
  60.             job.setOutputKeyClass(NullWritable.class);
  61.             job.setOutputValueClass(Text.class);

  62.             job.setNumReduceTasks(1);

  63.             boolean ret = job.waitForCompletion(true);
  64.             System.out.println("ret:"+ret);

  65.         } catch (IOException e) {
  66.             e.printStackTrace();
  67.         } catch (InterruptedException e) {
  68.             e.printStackTrace();
  69.         } catch (ClassNotFoundException e) {
  70.             e.printStackTrace();
  71.         } catch (URISyntaxException e) {
  72.             e.printStackTrace();
  73.         }

  74.     }
  75. }


阅读(1569) | 评论(0) | 转发(0) |
给主人留下些什么吧!~~