MapReduce输入数据源不仅可以是文件还可以是Hbase,从Hbase读取数据需要注意以下细节
1、Mapper类需要从TableMapper继承,并且实现函数
void map(ImmutableBytesWritable key, Result columns, Context context)
ImmutableBytesWritable key 实际上是Hbase表记录的
rowkeyResult columns 是hbase表记录的字段集合通过如下方式获取字段值,
其中RR表示列簇,
createDate是要获取的字段
String createDate = Bytes.toString(columns.getValue(Bytes.toBytes("RR"), Bytes.toBytes("createDate")));
2、在驱动程序中需要做如下操作
A)、设置hbase地址和端口
Configuration conf = HBaseConfiguration.create();
//设置HbaseIP地址
conf.set("hbase.zookeeper.quorum", "192.168.10.100");
//设置Hbase port
conf.set("hbase.zookeeper.property.clientPort", String.valueOf(2181));
B)、设置扫描器及caching,这样避免MapReduce输出日志过多导致MR失败
Scan scan = new Scan();
String startRowKey = "ROM.01.";
scan.setStartRow(Bytes.toBytes(startRowKey));scan.setCaching(3000);
C)、设置要读取的hbase表、扫描器,Mapper类及输出key,输出valueTableMapReduceUtil.initTableMapperJob("report", scan, DetailMapper.class, Text.class, Text.class, job);
具体实现如下:
-
import org.apache.commons.cli.*;
-
import org.apache.commons.lang3.StringUtils;
-
import org.apache.commons.logging.Log;
-
import org.apache.commons.logging.LogFactory;
-
import org.apache.hadoop.conf.Configuration;
-
import org.apache.hadoop.fs.Path;
-
import org.apache.hadoop.hbase.HBaseConfiguration;
-
import org.apache.hadoop.hbase.client.Result;
-
import org.apache.hadoop.hbase.client.Scan;
-
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
-
import org.apache.hadoop.hbase.mapreduce.TableMapper;
-
import org.apache.hadoop.hbase.util.Bytes;
-
import org.apache.hadoop.io.NullWritable;
-
import org.apache.hadoop.io.Text;
-
import org.apache.hadoop.mapreduce.Job;
-
import org.apache.hadoop.mapreduce.Reducer;
-
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-
import org.apache.hadoop.util.GenericOptionsParser;
-
-
import java.io.IOException;
-
-
-
public class DetailJob {
-
private static final Log log = LogFactory.getLog(DetailJob.class);
-
private static final String NAME = "DetailJob";
-
-
public static class DetailMapper extends TableMapper<Text, Text> {
-
private String key="";
-
@Override
-
protected void setup(Context context){
-
key = context.getConfiguration().get("key");
-
System.out.println("setup key:"+key);
-
}
-
@Override
-
protected void map(ImmutableBytesWritable key, Result columns, Context context) throws IOException,
-
InterruptedException {
-
String emmcId = Bytes.toString(columns.getValue(Bytes.toBytes("RR"), Bytes.toBytes("emmcid")));
-
String cpuId = Bytes.toString(columns.getValue(Bytes.toBytes("RR"), Bytes.toBytes("cpuid")));
-
String osVersion = Bytes.toString(columns.getValue(Bytes.toBytes("RR"), Bytes.toBytes("rom")));
-
String createDate = Bytes.toString(columns.getValue(Bytes.toBytes("RR"), Bytes.toBytes("createDate")));
-
String ram = Bytes.toString(columns.getValue(Bytes.toBytes("RR"), Bytes.toBytes("ram")));
-
-
StringBuilder sb = new StringBuilder();
-
sb.append(emmcId).append("\t");
-
sb.append(cpuId ).append("\t");
-
sb.append(osVersion).append("\t");
-
sb.append(ram).append("\t");
-
sb.append(createDate);
-
context.write(new Text(cpuId), new Text(sb.toString()));
-
}
-
}
-
-
public static class DetailReducer extends Reducer<Text, Text,
-
NullWritable,
-
Text> {
-
@Override
-
protected void reduce(Text key, Iterable<Text> values, Context context) throws
-
IOException, InterruptedException {
-
for (Text val : values) {
-
context.write(NullWritable.get(), new Text(val.toString()));
-
}
-
}
-
}
-
-
private static CommandLine parseArgs(String[] args) {
-
Options options = new Options();
-
Option o = new Option("d", "table", true,
-
"table to read from (must exist)");
-
o.setArgName("table-name");
-
o.setRequired(true);
-
options.addOption(o);
-
-
o = new Option("o", "output", true,
-
"the directory to write to");
-
o.setArgName("path-in-HDFS");
-
o.setRequired(true);
-
options.addOption(o);
-
-
o = new Option("k", "key", true,
-
"the directory to write to");
-
o.setArgName("path-in-HDFS");
-
o.setRequired(true);
-
options.addOption(o);
-
-
CommandLineParser parser = new PosixParser();
-
CommandLine cmd = null;
-
try {
-
cmd = parser.parse(options, args);
-
} catch (Exception e) {
-
System.err.println("ERROR: " + e.getMessage() + "\n");
-
HelpFormatter formatter = new HelpFormatter();
-
formatter.printHelp(NAME + " ", options, true);
-
System.exit(-1);
-
}
-
return cmd;
-
}
-
-
public static void main(String[] args) throws Exception {
-
Configuration conf = HBaseConfiguration.create();
-
//设置HbaseIP地址
-
conf.set("hbase.zookeeper.quorum", "192.168.10.100");
-
//设置Hbase port
-
conf.set("hbase.zookeeper.property.clientPort", String.valueOf(2181));
-
-
String[] otherArgs =
-
new GenericOptionsParser(conf, args).getRemainingArgs();
-
CommandLine cmd = parseArgs(otherArgs);
-
String date = cmd.getOptionValue("d");
-
String output = cmd.getOptionValue("o");
-
-
String key = cmd.getOptionValue("k");
-
conf.set("key",key);
-
-
//设置Hbase扫描数据的起始row
-
// String date = cmd.getOptionValue("d");
-
Scan scan = new Scan();
-
if (date != null && date.trim().length() > 0) {
-
//String nextDayStr = getNextDayStr(date);
-
String startRowKey = "ROM.01." + date.replace("-", "");
-
//String endRowKey = "ROM.01." + nextDayStr.replace("-", "");
-
scan.setStartRow(Bytes.toBytes(startRowKey));
-
// scan.setStopRow(Bytes.toBytes(endRowKey));
-
}
-
scan.setCaching(3000);
-
-
Job job = new Job(conf, NAME);
-
job.setJarByClass(DetailJob.class);
-
//设置要读取的hbase表、扫描器,Mapper类及输出key,输出value
-
TableMapReduceUtil.initTableMapperJob("report", scan, DetailMapper.class, Text.class, Text.class, job);
-
job.setReducerClass(DetailReducer.class);
-
job.setOutputKeyClass(NullWritable.class);
-
job.setOutputValueClass(Text.class);
-
job.setNumReduceTasks(1);
-
-
FileOutputFormat.setOutputPath(job, new Path(output));
-
job.waitForCompletion(true);
-
}
阅读(1735) | 评论(0) | 转发(0) |