分类: 云计算
2018-06-30 07:34:50
分层结构模式 是从数据中创造出不同于原始结构的新记录,其目的主要是将基于行的数据转换成分层的格式,如JSon或Xml等其他需要格式
主要原理是采用MultipleInputs将多个输入文件,通过不同的Map类,加入到Mapper阶段,在map类中通过打标签的方式来标识数据来源,在reduce阶段中根据map中打的标识来进行数据处理。
实现步骤:
1、自定义组合Key,包含主键和flag
2、根据输入数据来源定义不同的Map类,在Map中输出时需要通过自定义的组合键的flag标识数据来源,并且输出组合键
3、在reduce阶段,根据组合键进行不同需求处理输出
4、在驱动程序中,使用MultipleInputs进行输入
具体实现如下:
点击(此处)折叠或打开
- public class MultiInputJoinKey implements WritableComparable<MultiInputJoinKey>{
- private Text key = new Text();
- public IntWritable getType() {
- return type;
- }
- public void setType(IntWritable type) {
- this.type = type;
- }
- public Text getKey() {
- return key;
- }
- public void setKey(Text key) {
- this.key = key;
- }
- private IntWritable type = new IntWritable();
- public MultiInputJoinKey(){
- }
- public MultiInputJoinKey(Text key, IntWritable type){
- this.key = key;
- this.type = type;
- }
- @Override
- public int compareTo(MultiInputJoinKey o) {
- int ret = this.key.compareTo(o.key);
- return ret;
- }
- @Override
- public void write(DataOutput out) throws IOException {
- this.key.write(out);
- this.type.write(out);
- }
- @Override
- public void readFields(DataInput in) throws IOException {
- this.key.readFields(in);
- this.type.readFields(in);
- }
点击(此处)折叠或打开
- public class MultiInputJoinStatJob {
- public static class MultiInputNameMapper extends Mapper<LongWritable, Text, MultiInputJoinKey, Text> {
- @Override
- protected void map(LongWritable key, Text value,
- Context context) throws IOException, InterruptedException {
- String []arrays = value.toString().split("\t");
- System.out.println("MultiInputNameMapper data:"+value.toString());
- MultiInputJoinKey multiInputJoinKey = new MultiInputJoinKey(new Text(arrays[1]), new IntWritable(1));
- context.write(multiInputJoinKey, value);
- }
- }
- public static class MultiInputPriceMapper extends Mapper<LongWritable, Text, MultiInputJoinKey, Text> {
- @Override
- protected void map(LongWritable key, Text value,
- Context context) throws IOException, InterruptedException {
- String []arrays = value.toString().split("\t");
- System.out.println("MultiInputPriceMapper data:"+value.toString());
- MultiInputJoinKey multiInputJoinKey = new MultiInputJoinKey(new Text(arrays[0]), new IntWritable(2));
- context.write(multiInputJoinKey, value);
- }
- }
- public static class MultiInputReduce extends Reducer<MultiInputJoinKey, Text, NullWritable, Text>{
- @Override
- protected void reduce(MultiInputJoinKey key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
- int count=0;
- String name="";
- String price="";
- for (Text text :values){
- String []arrays = text.toString().split("\t");
- if(key.getType().get() ==1){//name mapper
- name=arrays[1];
- }else {//price mapper
- price= arrays[2];
- }
- count++;
- }
- if(name.length()>0){
- String data = name+" "+price +" "+count;
- System.out.println("MultiInputReduce data:"+ data);
- context.write(NullWritable.get(),new Text(data));
- }
- }
- }
- public static void main(String []args){
- try {
- Job job = Job.getInstance();
- job.setJobName("MultiInputJoinStatJob");
- job.setJarByClass(MultiInputJoinStatJob.class);
- String []arrays = new GenericOptionsParser(args).getRemainingArgs();
- MultipleInputs.addInputPath(job, new Path(arrays[0]), TextInputFormat.class, MultiInputNameMapper.class);
- MultipleInputs.addInputPath(job,new Path(arrays[1]), TextInputFormat.class, MultiInputPriceMapper.class);
- FileOutputFormat.setOutputPath(job, new Path(arrays[2]));
- FileSystem fs = FileSystem.get(new URI(arrays[2]),job.getConfiguration());
- if(fs.exists(new Path(arrays[2]))){
- fs.delete(new Path(arrays[2]),true);
- }
- job.setMapOutputKeyClass(MultiInputJoinKey.class);
- job.setMapOutputValueClass(Text.class);
- job.setReducerClass(MultiInputReduce.class);
- job.setOutputKeyClass(NullWritable.class);
- job.setOutputValueClass(Text.class);
- job.setNumReduceTasks(1);
- boolean ret = job.waitForCompletion(true);
- System.out.println("ret:"+ret);
- } catch (IOException e) {
- e.printStackTrace();
- } catch (InterruptedException e) {
- e.printStackTrace();
- } catch (ClassNotFoundException e) {
- e.printStackTrace();
- } catch (URISyntaxException e) {
- e.printStackTrace();
- }
- }
- }