需要注意:
1、计数类需要继承
Serializable实现序列化
2、聚合函数
aggregate需要3个参数,第一个参数初始化值,第二个参数累计个数和数值,第三个参数累计个数和数值总数
-
import org.apache.spark.SparkConf;
-
import org.apache.spark.api.java.JavaRDD;
-
import org.apache.spark.api.java.JavaSparkContext;
-
import org.apache.spark.api.java.function.Function2;
-
-
import java.io.Serializable;
-
import java.util.Arrays;
-
-
public class BasicAvg {
-
public static class CountAvg implements Serializable {
-
private int num=0;
-
private int total;
-
public CountAvg(int num,int total){
-
this.num=num;
-
this.total=total;
-
}
-
public int avg(){
-
return total/num;
-
}
-
public int getTotal() {
-
return total;
-
}
-
-
public void setTotal(int total) {
-
this.total = total;
-
}
-
-
public int getNum() {
-
return num;
-
}
-
-
public void setNum(int num) {
-
this.num = num;
-
}
-
-
}
-
public static void main(String []args){
-
SparkConf conf = new SparkConf().setMaster("local").setAppName("BasicAvg");
-
JavaSparkContext sc = new JavaSparkContext(conf);
-
-
JavaRDD<Integer> rdd =sc.parallelize(Arrays.asList(1,2,3,4));
-
-
CountAvg countAvg = new CountAvg(0, 0);
-
CountAvg avg = rdd.aggregate(countAvg, new Function2<CountAvg, Integer, CountAvg>() {
-
@Override
-
public CountAvg call(CountAvg countAvg, Integer integer) throws Exception {
-
countAvg.num = countAvg.num + 1;
-
countAvg.total = countAvg.total + integer;
-
return countAvg;
-
}
-
}, new Function2<CountAvg, CountAvg, CountAvg>() {
-
@Override
-
public CountAvg call(CountAvg countAvg, CountAvg countAvg2) throws Exception {
-
countAvg.num = countAvg2.num + countAvg.num ;
-
countAvg.total = countAvg2.total + countAvg.total;
-
return countAvg;
-
}
-
});
-
-
System.out.println(avg.avg());
-
-
}
阅读(1692) | 评论(0) | 转发(0) |