注意点:
1、 需要将文件添加到Hdfs缓存中 hadoop fs –put /test/a.txt localpath
2、 在使用job.addCacheFile()添加文件时参数要注意,要使用new URI(array[1]) ,有的资料介绍使用(new File(args[3])).toURI()),我使用后者时报文件找不到,经过分析,toURI函数会在文件前加 File://前缀,导致在setup是读取不到的
3、 在setup中读取有两种方式
A、 从hdfs文件系统上读取
Configuration
conf=context.getConfiguration();
FileSystem fs = FileSystem.get(conf);
URI [] uris = context.getCacheFiles();
FSDataInputStream in
= fs.open(new Path(uris[0].getPath()));
B、 从本节点读取
Configuration
conf=context.getConfiguration();
Path[]path = context.getLocalCacheFiles();
FileSystem
fsopen= FileSystem.getLocal(conf);
FSDataInputStream in = fs.open(path[0]));
从官方提供API来看,已不建议使用A方法,建议使用B方法。
-
import org.apache.commons.lang.StringUtils;
-
import org.apache.commons.logging.Log;
-
import org.apache.commons.logging.LogFactory;
-
import org.apache.hadoop.fs.*;
-
import org.apache.hadoop.io.LongWritable;
-
import org.apache.hadoop.io.NullWritable;
-
import org.apache.hadoop.io.Text;
-
import org.apache.hadoop.mapreduce.Job;
-
import org.apache.hadoop.mapreduce.MRJobConfig;
-
import org.apache.hadoop.mapreduce.Mapper;
-
import org.apache.hadoop.mapreduce.Reducer;
-
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
-
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-
import org.apache.hadoop.util.GenericOptionsParser;
-
-
import java.io.*;
-
import java.net.URI;
-
import java.net.URISyntaxException;
-
import java.util.HashMap;
-
-
-
public class FontProviderStatJob {
-
private static final Log LOG = LogFactory.getLog(FontProviderStatJob.class);
-
public static class FontProviderStatMapper extends Mapper<LongWritable, Text, Text, Text> {
-
-
@Override
-
protected void map(LongWritable key, Text value,
-
Context context) throws IOException, InterruptedException{
-
String strValue = value.toString();
-
if(StringUtils.isEmpty(strValue)){
-
LOG.info("strValue is null");
-
return;
-
}
-
String []arrs = strValue.split("\t");
-
if(arrs.length!=2){
-
LOG.info("strValue error, value:"+strValue);
-
return;
-
}
-
// String name = providerMap.get(arrs[0]);
-
//System.out.println("name:"+name);
-
context.write(new Text(arrs[0]), new Text(arrs[1]));
-
}
-
}
-
-
public static class FontProviderStatReducer extends Reducer<Text, Text, NullWritable,Text>{
-
private HashMap<String, String> providerMap = new HashMap<String,String>();
-
FileSystem fileSystem=null;
-
@Override
-
protected void setup(Reducer.Context context) throws IOException, InterruptedException {
-
System.out.println("reducer setup start" );
-
try {
-
URI[] localPaths = context.getCacheFiles();
-
for (URI uri:localPaths){
-
System.out.println(uri.toString());
-
LOG.info("localPaths:"+uri.toString());
-
}
-
-
if(localPaths.length==0 ){
-
throw new FileNotFoundException("Distributed cache file not found.");
-
}
-
-
FileSystem fileSystem = FileSystem.get(context.getConfiguration());
-
FSDataInputStream fsData =fileSystem.open(new Path(localPaths[0].getPath()));
-
BufferedReader bf = new BufferedReader(new InputStreamReader(fsData));
-
String strData="";
-
while ((strData=bf.readLine())!=null){
-
String []arrays = strData.split("\t");
-
this.providerMap.put(arrays[0],arrays[1]);
-
}
-
}catch (Exception ex){
-
System.out.println("Exception");
-
ex.printStackTrace();
-
}
-
-
}
-
@Override
-
protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException{
-
String provideName = this.providerMap.get(key.toString());
-
int count=0;
-
for (Text text:values){
-
count++;
-
}
-
context.write(NullWritable.get(),new Text(provideName+"\t"+key +"\t"+String.valueOf(count)));
-
}
-
}
-
-
public static void main(String[] args){
-
-
try {
-
Job job = Job.getInstance();
-
job.setJarByClass(FontProviderStatJob.class);
-
-
String []array = new GenericOptionsParser(job.getConfiguration(),args).getRemainingArgs();
-
FileInputFormat.setInputPaths(job,new Path(array[0]));
-
//DistributedCache.addCacheFile(new URI(array[1]),job.getConfiguration());
-
URI url = (new File(array[1])).toURI();
-
System.out.println(url.toString());
-
System.out.println(job.getConfiguration().get(MRJobConfig.CACHE_FILES));
-
job.addCacheFile(new URI(array[1]));
-
-
//DistributedCache.addCacheFile(url,job.getConfiguration());
-
System.out.println(job.getConfiguration().get(MRJobConfig.CACHE_FILES));
-
//job.addCacheFile((new File(args[2])).toURI());
-
FileOutputFormat.setOutputPath(job,new Path(array[2]));
-
-
job.setMapperClass(FontProviderStatJob.FontProviderStatMapper.class);
-
job.setMapOutputKeyClass(Text.class);
-
job.setMapOutputValueClass(Text.class);
-
-
job.setReducerClass(FontProviderStatReducer.class);
-
job.setNumReduceTasks(1);
-
job.setOutputKeyClass(Text.class);
-
job.setOutputValueClass(Text.class);
-
boolean ret = job.waitForCompletion(true);
-
LOG.info("ret:"+ret);
-
} catch (IOException e) {
-
e.printStackTrace();
-
-
} catch (InterruptedException e) {
-
e.printStackTrace();
-
} catch (ClassNotFoundException e) {
-
e.printStackTrace();
-
} catch (URISyntaxException e) {
-
e.printStackTrace();
-
}
-
-
}
阅读(780) | 评论(0) | 转发(0) |