outputFormat
hadoop依靠job的输出格式做两个主要的任务:
1.检验job的输出配置。
2.创建RecordWriter的实现写job的输出。
跟FileInputFormat相对应的,FileOutputFormat处理基于文件的输出。因为MapReduce job的大多数输出写到hdfs,很多基于文件的输出格式相应的api都能解决大部分的需求。Hadoop默认使用TextOutputFormat,存储tab分隔的键值对到配置的hdfs的输出目录。TextOutputFormat也检验开始运行job之前输出目录不能存在。
TextoutputFormat 使用LineRecordWriter对每个map或reduce任务写键值对,根据是否是reduce阶段。这个类使用toString方法序列化每一键值对到存储在hdfs的part文件里,用tab分隔键值。这个分隔符是默认的,能通过配置改变。
跟inputFormat类似,数据不会受限于只存在hdfs上。只要能用java把键值对写到其它源,例如jdbc,就可以用MapReduce做批量写。只需要保证你要写到的地方能处理多个任务产生的连接。
outputFormat抽象类有三个抽象方法需要实现:
checkOutputSpecs 用于验证job指定的输出,例如保证job提交之前输出目录不能存在。否则,输出可能覆盖(看具体配置)。
GetRecordWriter 方法返回RecordWriter的实现,来序列化键值对到输出,输出可以是FileSystem对象。
GetOutputCommiter Job的输出提交者在初始化期间设置每个任务,根据成功完成的状态提交(commit,区别于submit)任务,成功或其它状态,完成时都会清除任务。对基于文件的输出,FileOutputCommittter可以处理所有繁重的工作。它会对每个map任务创建临时输出目录,把成功的任务的输出移动到最终的输出目录。
RecordWriter
RecordWriter抽象类把键值对写到文件系统或另外的输出。与RecordReader不同,它没有初始化阶段。然而,可用构造器在需要的时候设置record writer。构造期间任何参数都能传入,因为record writer实例的创建是通过OutputFormat.getRecordWriter。
此类包含两个方法:
Write 这个方法由框架对每个要写的键值对调用。这个方法的实现很大程度上取决于你的使用。下面的例子中,我们展示怎样把键值对写到外部的内存键值存储,而不是文件系统。
Close 当处理完键值对时,框架调用这个方法。可以释放文件句柄,关闭跟其它服务的连接,或清除需要清除的任务。
具体实现如下:
-
import org.apache.hadoop.fs.Path;
-
import org.apache.hadoop.io.LongWritable;
-
import org.apache.hadoop.io.Text;
-
import org.apache.hadoop.mapreduce.*;
-
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
-
import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
-
import redis.clients.jedis.Jedis;
-
-
import java.io.IOException;
-
-
public class RedisOutputFormatJob {
-
public static class RedisOutputFormatMapper extends Mapper<LongWritable, Text, Text, Text>{
-
-
@Override
-
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
-
String []arrays = value.toString().split("\t");
-
-
System.out.println("data:"+value.toString());
-
context.write(new Text(arrays[0]), new Text(arrays[1]));
-
-
}
-
}
-
-
public static class RedisOutputFormat extends OutputFormat<Text, Text> {
-
public static final String REDIS_HOSTS_CONF = "mapred.redishashoutputformat.hosts";
-
public static final String REDIS_HASH_KEY_CONF = "mapred.redishashinputformat.key";
-
public static void setRedisHostsConf(Job job,String host){
-
job.getConfiguration().set(REDIS_HOSTS_CONF, host);
-
}
-
public static void setRedisHashKeyConf(Job job, String key){
-
job.getConfiguration().set(REDIS_HASH_KEY_CONF,key);
-
-
}
-
-
@Override
-
public RecordWriter<Text, Text> getRecordWriter(TaskAttemptContext context) throws IOException, InterruptedException {
-
return new RedisRecordWrite(context.getConfiguration().get(REDIS_HOSTS_CONF), context.getConfiguration().get(REDIS_HASH_KEY_CONF));
-
}
-
-
@Override
-
public void checkOutputSpecs(JobContext context) throws IOException, InterruptedException {
-
-
String host = context.getConfiguration().get(REDIS_HOSTS_CONF);
-
if (host == null || host.isEmpty()) {
-
throw new IOException(REDIS_HOSTS_CONF
-
+ " is not set in configuration.");
-
}
-
-
String key = context.getConfiguration().get(REDIS_HASH_KEY_CONF);
-
if (key == null || key.isEmpty()) {
-
throw new IOException(REDIS_HASH_KEY_CONF
-
+ " is not set in configuration.");
-
}
-
}
-
-
@Override
-
public OutputCommitter getOutputCommitter(TaskAttemptContext context) throws IOException, InterruptedException {
-
return (new NullOutputFormat<Text,Text>()).getOutputCommitter(context);
-
}
-
}
-
-
public static class RedisRecordWrite extends RecordWriter<Text, Text>{
-
//private String host;
-
private String key;
-
//private HashMap jedisMap = new HashMap();
-
Jedis jedis= null;
-
public RedisRecordWrite(String host,String key){
-
this.key = key;
-
int i=0;
-
//for (String strHost: host.split(",")){
-
jedis = new Jedis(host,7379);
-
jedis.connect();
-
// jedisMap.put(++i,jedis);
-
// }
-
-
}
-
-
@Override
-
public void write(Text field, Text value) throws IOException, InterruptedException {
-
jedis.hset(key, field.toString(),value.toString());
-
}
-
-
@Override
-
public void close(TaskAttemptContext context) throws IOException, InterruptedException {
-
jedis.close();
-
}
-
}
-
-
public static void main(String args[]){
-
try {
-
Job job = Job.getInstance();
-
job.setJobName("RedisOutputFormatJob");
-
job.setJarByClass(RedisOutputFormatJob.class);
-
job.setMapperClass(RedisOutputFormatMapper.class);
-
job.setNumReduceTasks(0);
-
FileInputFormat.setInputPaths(job, new Path(args[2]));
-
-
job.setOutputFormatClass(RedisOutputFormat.class);
-
RedisOutputFormat.setRedisHostsConf(job,args[0]);
-
RedisOutputFormat.setRedisHashKeyConf(job,args[1]);
-
-
-
job.setOutputKeyClass(Text.class);
-
job.setOutputValueClass(Text.class);
-
System.out.println(job.waitForCompletion(true));
-
-
} catch (IOException e) {
-
e.printStackTrace();
-
} catch (InterruptedException e) {
-
e.printStackTrace();
-
} catch (ClassNotFoundException e) {
-
e.printStackTrace();
-
}
-
}
-
}
阅读(2805) | 评论(0) | 转发(0) |