Chinaunix首页 | 论坛 | 博客
  • 博客访问: 1086277
  • 博文数量: 143
  • 博客积分: 969
  • 博客等级: 准尉
  • 技术积分: 1765
  • 用 户 组: 普通用户
  • 注册时间: 2011-07-30 12:09
文章分类

全部博文(143)

文章存档

2023年(4)

2021年(2)

2020年(4)

2019年(4)

2018年(33)

2017年(6)

2016年(13)

2014年(7)

2013年(23)

2012年(33)

2011年(14)

我的朋友

分类: 云计算

2018-06-30 20:32:00

     分区模式将记录进行分类,但它并不关心记录的顺序。其主要目的是将数据集中相似的记录分成不同的、更小的数据集
     分区主要原理是,自定义分区类继承Partitioner,根据业务需求实现分区函数 public int getPartition(Text key, Text value, int numPartitions),将Key相同的记录,划分到同一reduce函数中。需要注意的是如果在驱动程序中将NumReduceTasks值设置为时, 不会执行分区函数。这个可以理解,毕竟只有1个reduce,所以,没有必要执行Partitioner
 
具体实现如下:

点击(此处)折叠或打开

  1. import org.apache.hadoop.fs.FileSystem;
  2. import org.apache.hadoop.fs.Path;
  3. import org.apache.hadoop.io.LongWritable;
  4. import org.apache.hadoop.io.NullWritable;
  5. import org.apache.hadoop.io.Text;
  6. import org.apache.hadoop.mapreduce.Job;
  7. import org.apache.hadoop.mapreduce.Mapper;
  8. import org.apache.hadoop.mapreduce.Partitioner;
  9. import org.apache.hadoop.mapreduce.Reducer;
  10. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
  11. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

  12. import java.io.IOException;

  13. public class PartitionJob {

  14.     public static class PartitionMapper extends Mapper<LongWritable ,Text, Text,Text>{
  15.         private Text tmpKey = new Text();
  16.         @Override
  17.         protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
  18.             String data = value.toString();
  19.             String []arrays = data.split("\t");
  20.             tmpKey.set(arrays[0]);
  21.             System.out.println("data:"+data);
  22.             context.write(tmpKey, value);
  23.         }
  24.     }

  25.     public static class PartitionReduce extends Reducer<Text, Text, NullWritable, Text>{
  26.         private Text out = new Text();
  27.         @Override
  28.         protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
  29.             String str="";
  30.             for (Text val:values){
  31.                 context.write(NullWritable.get(), val);
  32.             }
  33.         }

  34.     }

  35.     public static class Partition extends Partitioner<Text ,Text>{
  36.         @Override
  37.         public int getPartition(Text key, Text value, int numPartitions) {
  38.             return Math.abs(key.hashCode())%numPartitions;
  39.         }

  40.     }

  41.     public static void main(String []args){

  42.         try {
  43.             Job job = Job.getInstance();
  44.             job.setJobName("PartitionJob");
  45.             job.setJarByClass(PartitionJob.class);
  46.             job.setMapperClass(PartitionMapper.class);
  47.             job.setMapOutputKeyClass(Text.class);
  48.             job.setMapOutputValueClass(Text.class);

  49.             job.setReducerClass(PartitionReduce.class);
  50.             job.setOutputKeyClass(NullWritable.class);
  51.             job.setOutputValueClass(Text.class);
  52.             job.setPartitionerClass(Partition.class);
  53.             job.setNumReduceTasks(2);

  54.             FileInputFormat.addInputPath(job, new Path(args[0]));
  55.             FileOutputFormat.setOutputPath(job, new Path(args[1]));
  56.             FileSystem.get(job.getConfiguration()).delete(new Path(args[1]), true);

  57.            System.out.println(job.waitForCompletion(true));
  58.         } catch (IOException e) {
  59.             e.printStackTrace();
  60.         } catch (InterruptedException e) {
  61.             e.printStackTrace();
  62.         } catch (ClassNotFoundException e) {
  63.             e.printStackTrace();
  64.         }


  65.     }
  66. }

阅读(3028) | 评论(0) | 转发(0) |
给主人留下些什么吧!~~