Chinaunix首页 | 论坛 | 博客
  • 博客访问: 1113596
  • 博文数量: 143
  • 博客积分: 969
  • 博客等级: 准尉
  • 技术积分: 1765
  • 用 户 组: 普通用户
  • 注册时间: 2011-07-30 12:09
文章分类

全部博文(143)

文章存档

2023年(4)

2021年(2)

2020年(4)

2019年(4)

2018年(33)

2017年(6)

2016年(13)

2014年(7)

2013年(23)

2012年(33)

2011年(14)

我的朋友

分类: 云计算

2018-06-11 21:03:28

注意点:

1、 需要将文件添加到Hdfs缓存中 hadoop fs –put  /test/a.txt  localpath
2、 在使用job.addCacheFile()添加文件时参数要注意,要使用new URI(array[1]) ,有的资料介绍使用(new File(args[3])).toURI()),我使用后者时报文件找不到,经过分析,toURI函数会在文件前加 File://前缀,导致在setup是读取不到的
3、 在setup中读取有两种方式
         A、
 从hdfs文件系统上读取        

 Configuration conf=context.getConfiguration();

FileSystem fs = FileSystem.get(conf);

URI [] uris = context.getCacheFiles();

 FSDataInputStream in = fs.open(new Path(uris[0].getPath()));

B、
 从本节点读取

        Configuration conf=context.getConfiguration();

Path[]path  = context.getLocalCacheFiles();

FileSystem fsopen= FileSystem.getLocal(conf);

FSDataInputStream in = fs.open(path[0]));


从官方提供API来看,已不建议使用A方法,建议使用B方法。


点击(此处)折叠或打开

  1. import org.apache.commons.lang.StringUtils;
  2. import org.apache.commons.logging.Log;
  3. import org.apache.commons.logging.LogFactory;
  4. import org.apache.hadoop.fs.*;
  5. import org.apache.hadoop.io.LongWritable;
  6. import org.apache.hadoop.io.NullWritable;
  7. import org.apache.hadoop.io.Text;
  8. import org.apache.hadoop.mapreduce.Job;
  9. import org.apache.hadoop.mapreduce.MRJobConfig;
  10. import org.apache.hadoop.mapreduce.Mapper;
  11. import org.apache.hadoop.mapreduce.Reducer;
  12. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
  13. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
  14. import org.apache.hadoop.util.GenericOptionsParser;

  15. import java.io.*;
  16. import java.net.URI;
  17. import java.net.URISyntaxException;
  18. import java.util.HashMap;


  19. public class FontProviderStatJob {
  20.     private static final Log LOG = LogFactory.getLog(FontProviderStatJob.class);
  21.     public static class FontProviderStatMapper extends Mapper<LongWritable, Text, Text, Text> {

  22.         @Override
  23.         protected void map(LongWritable key, Text value,
  24.                                       Context context) throws IOException, InterruptedException{
  25.             String strValue = value.toString();
  26.             if(StringUtils.isEmpty(strValue)){
  27.                 LOG.info("strValue is null");
  28.                return;
  29.             }
  30.             String []arrs = strValue.split("\t");
  31.             if(arrs.length!=2){
  32.                 LOG.info("strValue error, value:"+strValue);
  33.                 return;
  34.             }
  35.            // String name = providerMap.get(arrs[0]);
  36.             //System.out.println("name:"+name);
  37.             context.write(new Text(arrs[0]), new Text(arrs[1]));
  38.         }
  39.     }

  40.     public static class FontProviderStatReducer extends Reducer<Text, Text, NullWritable,Text>{
  41.         private HashMap<String, String> providerMap = new HashMap<String,String>();
  42.         FileSystem fileSystem=null;
  43.         @Override
  44.         protected void setup(Reducer.Context context) throws IOException, InterruptedException {
  45.             System.out.println("reducer setup start" );
  46.             try {
  47.                 URI[] localPaths = context.getCacheFiles();
  48.                 for (URI uri:localPaths){
  49.                     System.out.println(uri.toString());
  50.                     LOG.info("localPaths:"+uri.toString());
  51.                 }

  52.                 if(localPaths.length==0 ){
  53.                     throw new FileNotFoundException("Distributed cache file not found.");
  54.                 }

  55.                 FileSystem fileSystem = FileSystem.get(context.getConfiguration());
  56.                 FSDataInputStream fsData =fileSystem.open(new Path(localPaths[0].getPath()));
  57.                 BufferedReader bf = new BufferedReader(new InputStreamReader(fsData));
  58.                 String strData="";
  59.                 while ((strData=bf.readLine())!=null){
  60.                     String []arrays = strData.split("\t");
  61.                     this.providerMap.put(arrays[0],arrays[1]);
  62.                 }
  63.             }catch (Exception ex){
  64.                 System.out.println("Exception");
  65.                 ex.printStackTrace();
  66.             }

  67.         }
  68.         @Override
  69.         protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException{
  70.             String provideName = this.providerMap.get(key.toString());
  71.              int count=0;
  72.              for (Text text:values){
  73.                  count++;
  74.              }
  75.             context.write(NullWritable.get(),new Text(provideName+"\t"+key +"\t"+String.valueOf(count)));
  76.         }
  77.     }

  78.     public static void main(String[] args){

  79.         try {
  80.             Job job = Job.getInstance();
  81.             job.setJarByClass(FontProviderStatJob.class);

  82.             String []array = new GenericOptionsParser(job.getConfiguration(),args).getRemainingArgs();
  83.             FileInputFormat.setInputPaths(job,new Path(array[0]));
  84.             //DistributedCache.addCacheFile(new URI(array[1]),job.getConfiguration());
  85.             URI url = (new File(array[1])).toURI();
  86.             System.out.println(url.toString());
  87.             System.out.println(job.getConfiguration().get(MRJobConfig.CACHE_FILES));
  88.             job.addCacheFile(new URI(array[1]));

  89.             //DistributedCache.addCacheFile(url,job.getConfiguration());
  90.             System.out.println(job.getConfiguration().get(MRJobConfig.CACHE_FILES));
  91.             //job.addCacheFile((new File(args[2])).toURI());
  92.             FileOutputFormat.setOutputPath(job,new Path(array[2]));

  93.             job.setMapperClass(FontProviderStatJob.FontProviderStatMapper.class);
  94.             job.setMapOutputKeyClass(Text.class);
  95.             job.setMapOutputValueClass(Text.class);

  96.             job.setReducerClass(FontProviderStatReducer.class);
  97.             job.setNumReduceTasks(1);
  98.             job.setOutputKeyClass(Text.class);
  99.             job.setOutputValueClass(Text.class);
  100.             boolean ret = job.waitForCompletion(true);
  101.             LOG.info("ret:"+ret);
  102.         } catch (IOException e) {
  103.             e.printStackTrace();

  104.         } catch (InterruptedException e) {
  105.             e.printStackTrace();
  106.         } catch (ClassNotFoundException e) {
  107.             e.printStackTrace();
  108.         } catch (URISyntaxException e) {
  109.             e.printStackTrace();
  110.         }

  111.     }


阅读(780) | 评论(0) | 转发(0) |
0

上一篇:Join

下一篇:storm学习笔记

给主人留下些什么吧!~~