Chinaunix首页 | 论坛 | 博客
  • 博客访问: 1108613
  • 博文数量: 143
  • 博客积分: 969
  • 博客等级: 准尉
  • 技术积分: 1765
  • 用 户 组: 普通用户
  • 注册时间: 2011-07-30 12:09
文章分类

全部博文(143)

文章存档

2023年(4)

2021年(2)

2020年(4)

2019年(4)

2018年(33)

2017年(6)

2016年(13)

2014年(7)

2013年(23)

2012年(33)

2011年(14)

我的朋友

分类: 云计算

2018-06-21 20:11:03

在map中进行join

步骤:
1、在驱动中加载文件到 hdfs缓存中
2、在map的 setup函数中,读取缓存内容,放入HashMap中
3、在map函数中读取,根据key读取对应的值,在map中进行join
具体实现如下:

点击(此处)折叠或打开

  1. import org.apache.commons.lang.StringUtils;
  2. import org.apache.commons.logging.Log;
  3. import org.apache.commons.logging.LogFactory;
  4. import org.apache.hadoop.fs.FSDataInputStream;
  5. import org.apache.hadoop.fs.FileSystem;
  6. import org.apache.hadoop.fs.Path;
  7. import org.apache.hadoop.io.LongWritable;
  8. import org.apache.hadoop.io.NullWritable;
  9. import org.apache.hadoop.io.Text;
  10. import org.apache.hadoop.mapreduce.Job;
  11. import org.apache.hadoop.mapreduce.Mapper;
  12. import org.apache.hadoop.mapreduce.Reducer;
  13. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
  14. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
  15. import org.apache.hadoop.util.GenericOptionsParser;

  16. import java.io.BufferedReader;
  17. import java.io.FileNotFoundException;
  18. import java.io.IOException;
  19. import java.io.InputStreamReader;
  20. import java.net.URI;
  21. import java.net.URISyntaxException;
  22. import java.util.HashMap;


  23. public class MapperJoinStatJob {
  24.     private static final Log LOG = LogFactory.getLog(MapperJoinStatJob.class);
    1.  public static void main(String[] args){

    2.         try {
    3.             Job job = Job.getInstance();
    4.             job.setJobName("MapperJoinStatJob");
    5.             job.setJarByClass(MapperJoinStatJob.class);

    6.             String []array = new GenericOptionsParser(job.getConfiguration(),args).getRemainingArgs();
    7.             FileInputFormat.setInputPaths(job,new Path(array[0]));
    8.             job.addCacheFile(new URI(array[1]));
    9.             FileOutputFormat.setOutputPath(job,new Path(array[2]));

    10.             job.setMapperClass(MapperJoinStatJob.MapperJoinStatMapper.class);
    11.             job.setMapOutputKeyClass(Text.class);
    12.             job.setMapOutputValueClass(Text.class);

    13.             job.setReducerClass(MapperJoinStatJob.ReducerJoinStatReducer.class);
    14.             job.setNumReduceTasks(1);
    15.             job.setOutputKeyClass(Text.class);
    16.             job.setOutputValueClass(Text.class);
    17.             boolean ret = job.waitForCompletion(true);
    18.             LOG.info("ret:"+ret);
    19.         } catch (IOException e) {
    20.             e.printStackTrace();

    21.         } catch (InterruptedException e) {
    22.             e.printStackTrace();
    23.         } catch (ClassNotFoundException e) {
    24.             e.printStackTrace();
    25.         } catch (URISyntaxException e) {
    26.             e.printStackTrace();
    27.         }

    28.     }

  25.     public static class MapperJoinStatMapper extends Mapper<LongWritable, Text, Text, Text> {
  26.         private HashMap<String, String> nameMap = new HashMap<String,String>();
  27.         FileSystem fileSystem=null;
  28.         @Override
  29.         protected void setup(Mapper.Context context) throws IOException, InterruptedException {
  30.             System.out.println("MapperJoinStatMapper setup start" );
  31.             try {
  32.                 URI[] localPaths = context.getCacheFiles();
  33.                 for (URI uri:localPaths){
  34.                     LOG.info("localPaths:"+uri.toString());
  35.                 }

  36.                 if(localPaths.length==0 ){
  37.                     throw new FileNotFoundException("Distributed cache file not found.");
  38.                 }

  39.                 FileSystem fileSystem = FileSystem.get(context.getConfiguration());
  40.                 FSDataInputStream fsData = fileSystem.open(new Path(localPaths[0].getPath()));
  41.                 BufferedReader bf = new BufferedReader(new InputStreamReader(fsData));
  42.                 String strData="";
  43.                 while ((strData=bf.readLine())!=null){
  44.                     String []arrays = strData.split("\t");
  45.                     this.nameMap.put(arrays[0],arrays[1]);
  46.                 }
  47.             }catch (Exception ex){
  48.                 System.out.println("Exception");
  49.                 ex.printStackTrace();
  50.             }

  51.         }
  52.         @Override
  53.         protected void map(LongWritable key, Text value,
  54.                                       Context context) throws IOException, InterruptedException{
  55.             String strValue = value.toString();
  56.             if(StringUtils.isEmpty(strValue)){
  57.                 LOG.info("strValue is null");
  58.                return;
  59.             }
  60.             String []arrs = strValue.split("\t");
  61.             String name = nameMap.get(arrs[0]);
  62.             context.write(new Text(arrs[0] +"\t"+ name), new Text(arrs[1] ));
  63.         }
  64.     }

  65.     public static class ReducerJoinStatReducer extends Reducer<Text, Text, NullWritable,Text>{

  66.         @Override
  67.         protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException{
  68.              int count=0;
  69.              for (Text text:values){
  70.                  count++;
  71.              }
  72.             context.write(NullWritable.get(),new Text(key +"\t"+String.valueOf(count)));
  73.         }
  74.     }

  75.    
  76. }

阅读(648) | 评论(0) | 转发(0) |
给主人留下些什么吧!~~