Chinaunix首页 | 论坛 | 博客
  • 博客访问: 1111821
  • 博文数量: 143
  • 博客积分: 969
  • 博客等级: 准尉
  • 技术积分: 1765
  • 用 户 组: 普通用户
  • 注册时间: 2011-07-30 12:09
文章分类

全部博文(143)

文章存档

2023年(4)

2021年(2)

2020年(4)

2019年(4)

2018年(33)

2017年(6)

2016年(13)

2014年(7)

2013年(23)

2012年(33)

2011年(14)

我的朋友

分类: 云计算

2018-07-01 16:50:45

outputFormat
hadoop依靠job的输出格式做两个主要的任务:
1.检验job的输出配置。
2.创建RecordWriter的实现写job的输出。
跟FileInputFormat相对应的,FileOutputFormat处理基于文件的输出。因为MapReduce job的大多数输出写到hdfs,很多基于文件的输出格式相应的api都能解决大部分的需求。Hadoop默认使用TextOutputFormat,存储tab分隔的键值对到配置的hdfs的输出目录。TextOutputFormat也检验开始运行job之前输出目录不能存在。

TextoutputFormat 使用LineRecordWriter对每个map或reduce任务写键值对,根据是否是reduce阶段。这个类使用toString方法序列化每一键值对到存储在hdfs的part文件里,用tab分隔键值。这个分隔符是默认的,能通过配置改变。

跟inputFormat类似,数据不会受限于只存在hdfs上。只要能用java把键值对写到其它源,例如jdbc,就可以用MapReduce做批量写。只需要保证你要写到的地方能处理多个任务产生的连接。

outputFormat抽象类有三个抽象方法需要实现:
checkOutputSpecs  用于验证job指定的输出,例如保证job提交之前输出目录不能存在。否则,输出可能覆盖(看具体配置)。

GetRecordWriter  方法返回RecordWriter的实现,来序列化键值对到输出,输出可以是FileSystem对象。

GetOutputCommiter  Job的输出提交者在初始化期间设置每个任务,根据成功完成的状态提交(commit,区别于submit)任务,成功或其它状态,完成时都会清除任务。对基于文件的输出,FileOutputCommittter可以处理所有繁重的工作。它会对每个map任务创建临时输出目录,把成功的任务的输出移动到最终的输出目录。

RecordWriter
RecordWriter抽象类把键值对写到文件系统或另外的输出。与RecordReader不同,它没有初始化阶段。然而,可用构造器在需要的时候设置record writer。构造期间任何参数都能传入,因为record writer实例的创建是通过OutputFormat.getRecordWriter。
此类包含两个方法:
Write   这个方法由框架对每个要写的键值对调用。这个方法的实现很大程度上取决于你的使用。下面的例子中,我们展示怎样把键值对写到外部的内存键值存储,而不是文件系统。
Close   当处理完键值对时,框架调用这个方法。可以释放文件句柄,关闭跟其它服务的连接,或清除需要清除的任务。

具体实现如下:

点击(此处)折叠或打开

  1. import org.apache.hadoop.fs.Path;
  2. import org.apache.hadoop.io.LongWritable;
  3. import org.apache.hadoop.io.Text;
  4. import org.apache.hadoop.mapreduce.*;
  5. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
  6. import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
  7. import redis.clients.jedis.Jedis;

  8. import java.io.IOException;

  9. public class RedisOutputFormatJob {
  10.     public static class RedisOutputFormatMapper extends Mapper<LongWritable, Text, Text, Text>{

  11.         @Override
  12.         protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
  13.             String []arrays = value.toString().split("\t");

  14.             System.out.println("data:"+value.toString());
  15.             context.write(new Text(arrays[0]), new Text(arrays[1]));

  16.         }
  17.     }

  18.     public static class RedisOutputFormat extends OutputFormat<Text, Text> {
  19.         public static final String REDIS_HOSTS_CONF = "mapred.redishashoutputformat.hosts";
  20.         public static final String REDIS_HASH_KEY_CONF = "mapred.redishashinputformat.key";
  21.         public static void setRedisHostsConf(Job job,String host){
  22.             job.getConfiguration().set(REDIS_HOSTS_CONF, host);
  23.         }
  24.         public static void setRedisHashKeyConf(Job job, String key){
  25.             job.getConfiguration().set(REDIS_HASH_KEY_CONF,key);

  26.         }

  27.         @Override
  28.         public RecordWriter<Text, Text> getRecordWriter(TaskAttemptContext context) throws IOException, InterruptedException {
  29.             return new RedisRecordWrite(context.getConfiguration().get(REDIS_HOSTS_CONF), context.getConfiguration().get(REDIS_HASH_KEY_CONF));
  30.         }

  31.         @Override
  32.         public void checkOutputSpecs(JobContext context) throws IOException, InterruptedException {

  33.             String host = context.getConfiguration().get(REDIS_HOSTS_CONF);
  34.             if (host == null || host.isEmpty()) {
  35.                 throw new IOException(REDIS_HOSTS_CONF
  36.                         + " is not set in configuration.");
  37.             }

  38.             String key = context.getConfiguration().get(REDIS_HASH_KEY_CONF);
  39.             if (key == null || key.isEmpty()) {
  40.                 throw new IOException(REDIS_HASH_KEY_CONF
  41.                         + " is not set in configuration.");
  42.             }
  43.         }

  44.         @Override
  45.         public OutputCommitter getOutputCommitter(TaskAttemptContext context) throws IOException, InterruptedException {
  46.             return (new NullOutputFormat<Text,Text>()).getOutputCommitter(context);
  47.         }
  48.     }

  49.     public static class RedisRecordWrite extends RecordWriter<Text, Text>{
  50.         //private String host;
  51.         private String key;
  52.         //private HashMap jedisMap = new HashMap();
  53.         Jedis jedis= null;
  54.        public RedisRecordWrite(String host,String key){
  55.            this.key = key;
  56.            int i=0;
  57.            //for (String strHost: host.split(",")){
  58.                jedis = new Jedis(host,7379);
  59.                jedis.connect();
  60.            // jedisMap.put(++i,jedis);
  61.           // }

  62.         }

  63.         @Override
  64.         public void write(Text field, Text value) throws IOException, InterruptedException {
  65.             jedis.hset(key, field.toString(),value.toString());
  66.         }

  67.         @Override
  68.         public void close(TaskAttemptContext context) throws IOException, InterruptedException {
  69.             jedis.close();
  70.         }
  71.     }

  72.     public static void main(String args[]){
  73.         try {
  74.             Job job = Job.getInstance();
  75.             job.setJobName("RedisOutputFormatJob");
  76.             job.setJarByClass(RedisOutputFormatJob.class);
  77.             job.setMapperClass(RedisOutputFormatMapper.class);
  78.             job.setNumReduceTasks(0);
  79.             FileInputFormat.setInputPaths(job, new Path(args[2]));

  80.             job.setOutputFormatClass(RedisOutputFormat.class);
  81.             RedisOutputFormat.setRedisHostsConf(job,args[0]);
  82.             RedisOutputFormat.setRedisHashKeyConf(job,args[1]);


  83.             job.setOutputKeyClass(Text.class);
  84.             job.setOutputValueClass(Text.class);
  85.             System.out.println(job.waitForCompletion(true));

  86.         } catch (IOException e) {
  87.             e.printStackTrace();
  88.         } catch (InterruptedException e) {
  89.             e.printStackTrace();
  90.         } catch (ClassNotFoundException e) {
  91.             e.printStackTrace();
  92.         }
  93.     }
  94. }


阅读(2797) | 评论(0) | 转发(0) |
给主人留下些什么吧!~~