Chinaunix首页 | 论坛 | 博客
  • 博客访问: 1106550
  • 博文数量: 143
  • 博客积分: 969
  • 博客等级: 准尉
  • 技术积分: 1765
  • 用 户 组: 普通用户
  • 注册时间: 2011-07-30 12:09
文章分类

全部博文(143)

文章存档

2023年(4)

2021年(2)

2020年(4)

2019年(4)

2018年(33)

2017年(6)

2016年(13)

2014年(7)

2013年(23)

2012年(33)

2011年(14)

我的朋友

分类: 云计算

2018-07-01 14:07:53

InputFormat

Hadoop依赖job的输入格式做三件事:
1.校验job的输入配置,例如数据是否存在。
2.分割文件块为逻辑上的inputSplit类型的块,每一个对应一个map任务。
3.创建RecordReader的实现从inputsplit创建键值对。这些键值对一个一个发送到mapper。

       最常用的输入格式的子类是FileInputFormat,hadoop默认是TextInputFormat。这个类首先校验job的输入,保证输入路径的存在。然后根据文件字节数大小逻辑分割输入文件,使用块大小作为分割边界值。例如,160M的文件,块大小64M时分成三个逻辑块,0M-64M,64m-128M,128M-160M.每个map任务都会对应其中一个块,然后RecordReader负责生成键值对。

       通常,recordReader有额外的修复边界问题的责任,因为输入分割边界是任意的,很有可能不是记录的边界。例如,TextInputFormat使用LineRecordReader读取文本文件对每个map的每一文本行创建键值对,例如用换行符分割。键是读到的一行的字节数,值是整行字符串。因为它不像输入分片的字节块,会用换行符分开,LineRecordRead会负责读到行的末尾保证读到一条完整的记录。不同数据块的这点数据(一个完整行)理论上可能不在相同的节点,所以从所在主机上读。这个读由FSDataInputStream类处理,我们就不必处理去哪儿找数据块。

       使用自己的格式时不要害怕经过了分割的边界,只需要检测没有重复或丢失数据。
       Notice:自定义的输入格式不限于文件输入。你可以把输入表示为InputSplit对象和键值对,自定义或其它的,可以在一个MapReduce job里并行读入任何东西到map阶段。只需要记住输入分片表示什么和利用数据本地性的优势。
       InputFormat抽象类有两个抽象方法:
        getSplits
       典型的实现是利用JobConText对象获取配置的输入并返回该对象的list。Inputsplit有个方法返回表示数据在集群中位置的机器的数组,提示TaskTracker应该执行的map任务。这个方法也是验证配置的正确性或抛出需要的异常的合适的地方,因为方法使用在前面。(例如在提交到jobTracker之前)
        CreateRecordReader
    这个方法在后面使用,用来生成RecordReader的实现,随后详细讨论。通常,一个新实例创建并立即返回,因为record reader有个初始化方法被框架调用。

       RecordReader

       RecordReader是用来根据给的InputSplit创建键值对的。因为inputsplit表示了分片的字节范围,使mapper的处理有意义。这就是为什么hadoophe MapReduce被认为是“读时模式”。模式是在RecordReader中定义的,单独的基于RecordReader的实现,而不是基于我们希望的job的输入。从输入源读取字节转换成writablecomparable key和一个writable value。创建自定义输入格式时经常使用自定义的类型,因为这是一种好的面向对象编程的方式来把信息给mapper。
      RecordReader使用数据和由inputsplit创建的边界生成键值对。在基于文件的输入的环境中,“start“是文件中的RecordReader应该开始生成键值对的字节偏移量。“end”是应该停止读记录的偏移量。就api而言,没有硬性的边界:不能阻止一个开发人员把整个文件作为一个map的输入,当然这是不建议的,经常需要越过边界读数据,来保证读到一条完整的记录。
      考虑xml的问题。当使用TextInputFormat抽取每行时,xml元素通常不在同一行,会被MapReduce input 分割。当读到输入分区边界的“end“之后,就得到一条完整记录。找到记录的末尾以后,你仅需要保证每条记录的读从xml元素的开始开始。找到inputsplit的开始之后,继续读直到开始的xml标签被读到。这允许MapReduce框架覆盖整个xml文件的内容,但不会重复任何xml记录。由于向前找xml元素的开始而跳过的xml内容会被前面的map任务处理。

       recordReader 抽象类有几个方法要覆盖。
       Initialize
           把map任务指定的inputSplit和TaskAttemptContext作为本方法的参数。对基于文件的输入格式,这是寻找开始读文件时的字节偏移的好时机。
       GetCurrentKey and getCurrentValue
           这两个方法被框架使用生成键值对发送给mapper。尽可能重用这两个方法返回的对象
       nextKeyValue
           类似inputFormat类里的对应方法,读一个简单的键值对并返回true,直到数据读完。
      GetProgress
            这是个可选的方法,用于框架对度量的收集。
      Close
            由框架使用,在没有键值对要处理时清除资源。

具体实现如下:

点击(此处)折叠或打开

  1. import org.apache.hadoop.fs.Path;
  2. import org.apache.hadoop.io.Text;
  3. import org.apache.hadoop.io.Writable;
  4. import org.apache.hadoop.mapreduce.*;
  5. import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
  6. import org.apache.log4j.Logger;
  7. import redis.clients.jedis.Jedis;

  8. import java.io.DataInput;
  9. import java.io.DataOutput;
  10. import java.io.IOException;
  11. import java.util.ArrayList;
  12. import java.util.Iterator;
  13. import java.util.List;
  14. import java.util.Map;

  15. public class RedisInputFormatJob {

  16.  public static class RedisInputFormat extends InputFormat<Text, Text>{

  17.      public static final String REDIS_HOSTS_CONF = "mapred.redisinputformat.hosts";
  18.      public static final String REDIS_HASH_KEY_CONF = "mapred.redisinputformat.key";
  19.      private static final Logger LOG = Logger.getLogger(RedisInputFormat.class);

  20.      public static void setRedisHostsConf(Job job, String hosts ){
  21.          job.getConfiguration().set(REDIS_HOSTS_CONF,hosts);
  22.      }

  23.      public static void setRedisHashKeyConf(Job job, String key){
  24.          job.getConfiguration().set(REDIS_HASH_KEY_CONF, key);
  25.      }

  26.      @Override
  27.      public List<InputSplit> getSplits(JobContext context) throws IOException, InterruptedException {
  28.          String hosts = context.getConfiguration().get(REDIS_HOSTS_CONF);
  29.          String key = context.getConfiguration().get(REDIS_HASH_KEY_CONF);
  30.          List<InputSplit> splits = new ArrayList<InputSplit>();
  31.          for (String host:hosts.split(",")){
  32.              splits.add(new RedisInputSplit(host, key));
  33.          }

  34.          return splits;
  35.      }

  36.      @Override
  37.      public RecordReader<Text, Text> createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
  38.          return new RedisRecordReader();
  39.      }
  40.  }

  41.     public static class RedisRecordReader extends RecordReader<Text, Text>{

  42.         private static final Logger LOG = Logger.getLogger(RedisRecordReader.class);
  43.         private Iterator<Map.Entry<String, String>> keyValueMapIter = null;
  44.         private Text key = new Text(), value = new Text();
  45.         private float processedKVs = 0, totalKVs = 0;
  46.         private Map.Entry<String, String> currentEntry = null;

  47.         public RedisRecordReader(){

  48.         }
  49.         @Override
  50.         public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
  51.             String redisHost = split.getLocations()[0];
  52.             String redisKey = ((RedisInputSplit)split).getKey();
  53.             Jedis jedis = new Jedis(redisHost, 7379);
  54.             jedis.connect();
  55.             jedis.getClient().setTimeoutInfinite();
  56.             totalKVs = jedis.hlen(redisKey);
  57.             keyValueMapIter = jedis.hgetAll(redisKey).entrySet().iterator();
  58.             LOG.info("Got " + totalKVs + " from " + redisKey);
  59.             jedis.disconnect();
  60.         }

  61.         @Override
  62.         public boolean nextKeyValue() throws IOException, InterruptedException {
  63.             if(keyValueMapIter.hasNext()){
  64.                 currentEntry = keyValueMapIter.next();
  65.                 key.set(currentEntry.getKey());
  66.                 value.set(currentEntry.getValue());
  67.                 return true;
  68.             }
  69.             return false;
  70.         }

  71.         @Override
  72.         public Text getCurrentKey() throws IOException, InterruptedException {
  73.             return key;
  74.         }

  75.         @Override
  76.         public Text getCurrentValue() throws IOException, InterruptedException {
  77.             return value;
  78.         }

  79.         @Override
  80.         public float getProgress() throws IOException, InterruptedException {
  81.             return 0;
  82.         }

  83.         @Override
  84.         public void close() throws IOException {

  85.         }
  86.     }

  87.     public static class RedisInputSplit extends InputSplit implements Writable {
  88.        private String hosts;
  89.        private String key;
  90.        public RedisInputSplit(){

  91.         }
  92.        public RedisInputSplit(String hosts, String key){
  93.            this.hosts = hosts;
  94.            this.key = key;

  95.         }
  96.         public long getLength() throws IOException, InterruptedException {
  97.             return 0;
  98.         }

  99.         public String getKey(){
  100.             return this.key;
  101.         }
  102.         @Override
  103.         public String[] getLocations() throws IOException, InterruptedException {
  104.             return new String[]{hosts};
  105.         }

  106.         @Override
  107.         public void write(DataOutput out) throws IOException {
  108.             out.writeUTF(hosts);
  109.             out.writeUTF(key);
  110.         }

  111.         @Override
  112.         public void readFields(DataInput in) throws IOException {
  113.             this.hosts = in.readUTF();
  114.             this.key = in.readUTF();

  115.         }
  116.     }

  117.     public static void main(String []args){

  118.         try {
  119.             Job job = Job.getInstance();
  120.             job.setJobName("RedisInputFormatJob");
  121.             job.setJarByClass(RedisInputFormat.class);

  122.             job.setNumReduceTasks(0);
  123.             job.setInputFormatClass(RedisInputFormat.class);
  124.             RedisInputFormat.setRedisHostsConf(job,args[0]);
  125.             RedisInputFormat.setRedisHashKeyConf(job, args[1]);

  126.             job.setOutputFormatClass(TextOutputFormat.class);
  127.             TextOutputFormat.setOutputPath(job, new Path(args[2]));

  128.            job.setOutputKeyClass(Text.class);
  129.             job.setOutputValueClass(Text.class);

  130.             System.out.println(job.waitForCompletion(true));

  131.         } catch (IOException e) {
  132.             e.printStackTrace();
  133.         } catch (InterruptedException e) {
  134.             e.printStackTrace();
  135.         } catch (ClassNotFoundException e) {
  136.             e.printStackTrace();
  137.         }
  138.     }



阅读(7274) | 评论(0) | 转发(0) |
给主人留下些什么吧!~~