InputFormat
Hadoop依赖job的输入格式做三件事:
1.校验job的输入配置,例如数据是否存在。
2.分割文件块为逻辑上的inputSplit类型的块,每一个对应一个map任务。
3.创建RecordReader的实现从inputsplit创建键值对。这些键值对一个一个发送到mapper。
最常用的输入格式的子类是FileInputFormat,hadoop默认是TextInputFormat。这个类首先校验job的输入,保证输入路径的存在。然后根据文件字节数大小逻辑分割输入文件,使用块大小作为分割边界值。例如,160M的文件,块大小64M时分成三个逻辑块,0M-64M,64m-128M,128M-160M.每个map任务都会对应其中一个块,然后RecordReader负责生成键值对。
通常,recordReader有额外的修复边界问题的责任,因为输入分割边界是任意的,很有可能不是记录的边界。例如,TextInputFormat使用LineRecordReader读取文本文件对每个map的每一文本行创建键值对,例如用换行符分割。键是读到的一行的字节数,值是整行字符串。因为它不像输入分片的字节块,会用换行符分开,LineRecordRead会负责读到行的末尾保证读到一条完整的记录。不同数据块的这点数据(一个完整行)理论上可能不在相同的节点,所以从所在主机上读。这个读由FSDataInputStream类处理,我们就不必处理去哪儿找数据块。
使用自己的格式时不要害怕经过了分割的边界,只需要检测没有重复或丢失数据。
Notice:自定义的输入格式不限于文件输入。你可以把输入表示为InputSplit对象和键值对,自定义或其它的,可以在一个MapReduce job里并行读入任何东西到map阶段。只需要记住输入分片表示什么和利用数据本地性的优势。
InputFormat抽象类有两个抽象方法:
getSplits
典型的实现是利用JobConText对象获取配置的输入并返回该对象的list。Inputsplit有个方法返回表示数据在集群中位置的机器的数组,提示TaskTracker应该执行的map任务。这个方法也是验证配置的正确性或抛出需要的异常的合适的地方,因为方法使用在前面。(例如在提交到jobTracker之前)
CreateRecordReader
这个方法在后面使用,用来生成RecordReader的实现,随后详细讨论。通常,一个新实例创建并立即返回,因为record reader有个初始化方法被框架调用。
RecordReader
RecordReader是用来根据给的InputSplit创建键值对的。因为inputsplit表示了分片的字节范围,使mapper的处理有意义。这就是为什么hadoophe MapReduce被认为是“读时模式”。模式是在RecordReader中定义的,单独的基于RecordReader的实现,而不是基于我们希望的job的输入。从输入源读取字节转换成writablecomparable key和一个writable value。创建自定义输入格式时经常使用自定义的类型,因为这是一种好的面向对象编程的方式来把信息给mapper。
RecordReader使用数据和由inputsplit创建的边界生成键值对。在基于文件的输入的环境中,“start“是文件中的RecordReader应该开始生成键值对的字节偏移量。“end”是应该停止读记录的偏移量。就api而言,没有硬性的边界:不能阻止一个开发人员把整个文件作为一个map的输入,当然这是不建议的,经常需要越过边界读数据,来保证读到一条完整的记录。
考虑xml的问题。当使用TextInputFormat抽取每行时,xml元素通常不在同一行,会被MapReduce input 分割。当读到输入分区边界的“end“之后,就得到一条完整记录。找到记录的末尾以后,你仅需要保证每条记录的读从xml元素的开始开始。找到inputsplit的开始之后,继续读直到开始的xml标签被读到。这允许MapReduce框架覆盖整个xml文件的内容,但不会重复任何xml记录。由于向前找xml元素的开始而跳过的xml内容会被前面的map任务处理。
recordReader 抽象类有几个方法要覆盖。
Initialize
把map任务指定的inputSplit和TaskAttemptContext作为本方法的参数。对基于文件的输入格式,这是寻找开始读文件时的字节偏移的好时机。
GetCurrentKey and getCurrentValue
这两个方法被框架使用生成键值对发送给mapper。尽可能重用这两个方法返回的对象
nextKeyValue
类似inputFormat类里的对应方法,读一个简单的键值对并返回true,直到数据读完。
GetProgress
这是个可选的方法,用于框架对度量的收集。
Close
由框架使用,在没有键值对要处理时清除资源。
具体实现如下:
-
import org.apache.hadoop.fs.Path;
-
import org.apache.hadoop.io.Text;
-
import org.apache.hadoop.io.Writable;
-
import org.apache.hadoop.mapreduce.*;
-
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
-
import org.apache.log4j.Logger;
-
import redis.clients.jedis.Jedis;
-
-
import java.io.DataInput;
-
import java.io.DataOutput;
-
import java.io.IOException;
-
import java.util.ArrayList;
-
import java.util.Iterator;
-
import java.util.List;
-
import java.util.Map;
-
-
public class RedisInputFormatJob {
-
-
public static class RedisInputFormat extends InputFormat<Text, Text>{
-
-
public static final String REDIS_HOSTS_CONF = "mapred.redisinputformat.hosts";
-
public static final String REDIS_HASH_KEY_CONF = "mapred.redisinputformat.key";
-
private static final Logger LOG = Logger.getLogger(RedisInputFormat.class);
-
-
public static void setRedisHostsConf(Job job, String hosts ){
-
job.getConfiguration().set(REDIS_HOSTS_CONF,hosts);
-
}
-
-
public static void setRedisHashKeyConf(Job job, String key){
-
job.getConfiguration().set(REDIS_HASH_KEY_CONF, key);
-
}
-
-
@Override
-
public List<InputSplit> getSplits(JobContext context) throws IOException, InterruptedException {
-
String hosts = context.getConfiguration().get(REDIS_HOSTS_CONF);
-
String key = context.getConfiguration().get(REDIS_HASH_KEY_CONF);
-
List<InputSplit> splits = new ArrayList<InputSplit>();
-
for (String host:hosts.split(",")){
-
splits.add(new RedisInputSplit(host, key));
-
}
-
-
return splits;
-
}
-
-
@Override
-
public RecordReader<Text, Text> createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
-
return new RedisRecordReader();
-
}
-
}
-
-
public static class RedisRecordReader extends RecordReader<Text, Text>{
-
-
private static final Logger LOG = Logger.getLogger(RedisRecordReader.class);
-
private Iterator<Map.Entry<String, String>> keyValueMapIter = null;
-
private Text key = new Text(), value = new Text();
-
private float processedKVs = 0, totalKVs = 0;
-
private Map.Entry<String, String> currentEntry = null;
-
-
public RedisRecordReader(){
-
-
}
-
@Override
-
public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
-
String redisHost = split.getLocations()[0];
-
String redisKey = ((RedisInputSplit)split).getKey();
-
Jedis jedis = new Jedis(redisHost, 7379);
-
jedis.connect();
-
jedis.getClient().setTimeoutInfinite();
-
totalKVs = jedis.hlen(redisKey);
-
keyValueMapIter = jedis.hgetAll(redisKey).entrySet().iterator();
-
LOG.info("Got " + totalKVs + " from " + redisKey);
-
jedis.disconnect();
-
}
-
-
@Override
-
public boolean nextKeyValue() throws IOException, InterruptedException {
-
if(keyValueMapIter.hasNext()){
-
currentEntry = keyValueMapIter.next();
-
key.set(currentEntry.getKey());
-
value.set(currentEntry.getValue());
-
return true;
-
}
-
return false;
-
}
-
-
@Override
-
public Text getCurrentKey() throws IOException, InterruptedException {
-
return key;
-
}
-
-
@Override
-
public Text getCurrentValue() throws IOException, InterruptedException {
-
return value;
-
}
-
-
@Override
-
public float getProgress() throws IOException, InterruptedException {
-
return 0;
-
}
-
-
@Override
-
public void close() throws IOException {
-
-
}
-
}
-
-
public static class RedisInputSplit extends InputSplit implements Writable {
-
private String hosts;
-
private String key;
-
public RedisInputSplit(){
-
-
}
-
public RedisInputSplit(String hosts, String key){
-
this.hosts = hosts;
-
this.key = key;
-
-
}
-
public long getLength() throws IOException, InterruptedException {
-
return 0;
-
}
-
-
public String getKey(){
-
return this.key;
-
}
-
@Override
-
public String[] getLocations() throws IOException, InterruptedException {
-
return new String[]{hosts};
-
}
-
-
@Override
-
public void write(DataOutput out) throws IOException {
-
out.writeUTF(hosts);
-
out.writeUTF(key);
-
}
-
-
@Override
-
public void readFields(DataInput in) throws IOException {
-
this.hosts = in.readUTF();
-
this.key = in.readUTF();
-
-
}
-
}
-
-
public static void main(String []args){
-
-
try {
-
Job job = Job.getInstance();
-
job.setJobName("RedisInputFormatJob");
-
job.setJarByClass(RedisInputFormat.class);
-
-
job.setNumReduceTasks(0);
-
job.setInputFormatClass(RedisInputFormat.class);
-
RedisInputFormat.setRedisHostsConf(job,args[0]);
-
RedisInputFormat.setRedisHashKeyConf(job, args[1]);
-
-
job.setOutputFormatClass(TextOutputFormat.class);
-
TextOutputFormat.setOutputPath(job, new Path(args[2]));
-
-
job.setOutputKeyClass(Text.class);
-
job.setOutputValueClass(Text.class);
-
-
System.out.println(job.waitForCompletion(true));
-
-
} catch (IOException e) {
-
e.printStackTrace();
-
} catch (InterruptedException e) {
-
e.printStackTrace();
-
} catch (ClassNotFoundException e) {
-
e.printStackTrace();
-
}
-
}
阅读(7274) | 评论(0) | 转发(0) |