Chinaunix首页 | 论坛 | 博客
  • 博客访问: 1108624
  • 博文数量: 143
  • 博客积分: 969
  • 博客等级: 准尉
  • 技术积分: 1765
  • 用 户 组: 普通用户
  • 注册时间: 2011-07-30 12:09
文章分类

全部博文(143)

文章存档

2023年(4)

2021年(2)

2020年(4)

2019年(4)

2018年(33)

2017年(6)

2016年(13)

2014年(7)

2013年(23)

2012年(33)

2011年(14)

我的朋友

分类: 大数据

2018-11-04 15:20:42

需要注意:
1、计数类需要继承Serializable实现序列化
2、聚合函数aggregate需要3个参数,第一个参数初始化值,第二个参数累计个数和数值,第三个参数累计个数和数值总数

点击(此处)折叠或打开

  1. import org.apache.spark.SparkConf;
  2. import org.apache.spark.api.java.JavaRDD;
  3. import org.apache.spark.api.java.JavaSparkContext;
  4. import org.apache.spark.api.java.function.Function2;

  5. import java.io.Serializable;
  6. import java.util.Arrays;

  7. public class BasicAvg {
  8.     public static class CountAvg implements Serializable {
  9.         private int num=0;
  10.         private int total;
  11.        public CountAvg(int num,int total){
  12.             this.num=num;
  13.             this.total=total;
  14.         }
  15.         public int avg(){
  16.             return total/num;
  17.         }
  18.         public int getTotal() {
  19.             return total;
  20.         }

  21.         public void setTotal(int total) {
  22.             this.total = total;
  23.         }

  24.         public int getNum() {
  25.             return num;
  26.         }

  27.         public void setNum(int num) {
  28.             this.num = num;
  29.         }

  30.     }
  31.     public static void main(String []args){
  32.         SparkConf conf = new SparkConf().setMaster("local").setAppName("BasicAvg");
  33.         JavaSparkContext sc = new JavaSparkContext(conf);

  34.         JavaRDD<Integer> rdd =sc.parallelize(Arrays.asList(1,2,3,4));

  35.         CountAvg countAvg = new CountAvg(0, 0);
  36.         CountAvg avg = rdd.aggregate(countAvg, new Function2<CountAvg, Integer, CountAvg>() {
  37.              @Override
  38.              public CountAvg call(CountAvg countAvg, Integer integer) throws Exception {
  39.                  countAvg.num = countAvg.num + 1;
  40.                  countAvg.total = countAvg.total + integer;
  41.                  return countAvg;
  42.              }
  43.          }, new Function2<CountAvg, CountAvg, CountAvg>() {
  44.              @Override
  45.              public CountAvg call(CountAvg countAvg, CountAvg countAvg2) throws Exception {
  46.                  countAvg.num = countAvg2.num + countAvg.num ;
  47.                  countAvg.total = countAvg2.total + countAvg.total;
  48.                  return countAvg;
  49.              }
  50.          });

  51.         System.out.println(avg.avg());

  52.     }

阅读(1666) | 评论(0) | 转发(0) |
给主人留下些什么吧!~~