Chinaunix首页 | 论坛 | 博客
  • 博客访问: 3602752
  • 博文数量: 365
  • 博客积分: 0
  • 博客等级: 民兵
  • 技术积分: 2522
  • 用 户 组: 普通用户
  • 注册时间: 2019-10-28 13:40
文章分类

全部博文(365)

文章存档

2023年(8)

2022年(130)

2021年(155)

2020年(50)

2019年(22)

我的朋友

分类: Java

2021-07-08 17:26:00

mapper

import org.apache.hadoop.io.IntWritable;

import org.apache.hadoop.io.LongWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

public class MavgaqiMapper extends Mapper {

    @Override

    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

        //忽略首行

        //key判断,第一行的key一定是0

        if (key.get()==0) {

            return;

        }

        //字符串类型转成java类型

        String data = value.toString();

        //切片成数组,一共有9个元素

        String[] msgs = data.split(",");

 

        if (!(msgs[6].equals("N/A"))){

            context.write(new Text(msgs[0]),new IntWritable(Integer.parseInt(msgs[6])));

        }

    }

}

reducer

import org.apache.hadoop.io.FloatWritable;

import org.apache.hadoop.io.IntWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

public class MavgaqiReducer extends Reducer {

    @Override

    protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException {

        //把处理的每一个201601value值都加起来

        int total = 0;

        int count = 0;

        int avg = 0;

        for (IntWritable v:values){

            //IntWritable类型转化为int

            count++;

            total += v.get();

        }

        avg = total/count;

        //totalint类型变为FloatWritable类型

        context.write(key,new IntWritable(avg));

    }

}

main

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.FloatWritable;

import org.apache.hadoop.io.IntWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class MavgaqiMain {

    public static void main(String[] args) throws Exception {

        Job job = Job.getInstance(new Configuration());

        //程序主类

        job.setJarByClass(MavgaqiMain.class);

        //Mapper类的相关设置

        job.setMapperClass(MavgaqiMapper.class);

        //Map输出keyvalue类型

        job.setMapOutputKeyClass(Text.class);

        job.setMapOutputValueClass(IntWritable.class);

        //Reducer类的相关设置

        job.setReducerClass(MavgaqiReducer.class);

        //程序运行输出key,value类型

        job.setOutputKeyClass(Text.class);

        job.setOutputValueClass(IntWritable.class);

        //设置输入,输出路径

        FileInputFormat.setInputPaths(job,new Path(args[0]));

        FileOutputFormat.setOutputPath(job,new Path(args[1]));

        //提交任务,并等待任务运行完成

        job.waitForCompletion(true);

    }

}

阅读(699) | 评论(0) | 转发(0) |
给主人留下些什么吧!~~