Spark有两种类型的共享变量:
累加器(accumulator):累加器用来对信息进行聚合,累加器的一个常见用途是在调试时对作业执行过程中的事件进行计数
广播变量(broadcast variable):广播变量用来高效分发较大的对象,让程序高效地向所有工作节点发送一个较大的只读值,以供一个或多个Spark 操作使用。比如,如果你的应用需要向所有节点发
送一个较大的只读查询表,甚至是机器学习算法中的一个很大的特征向量,广播变量用起来都很顺手。
-
import org.apache.commons.lang.StringUtils;
-
import org.apache.spark.Accumulator;
-
import org.apache.spark.SparkConf;
-
import org.apache.spark.api.java.JavaPairRDD;
-
import org.apache.spark.api.java.JavaRDD;
-
import org.apache.spark.api.java.JavaSparkContext;
-
import org.apache.spark.api.java.function.FlatMapFunction;
-
import org.apache.spark.api.java.function.Function2;
-
import org.apache.spark.api.java.function.PairFunction;
-
import org.apache.spark.broadcast.Broadcast;
-
import scala.Tuple2;
-
-
import java.util.ArrayList;
-
import java.util.Arrays;
-
import java.util.Iterator;
-
-
public class WordCount {
-
-
public static void main(String []args){
-
// String master=args[0];
-
// String inputFile=args[1];
-
// String outputFile=args[2];
-
String master="local";
-
String inputFile="D:\\test.txt";
-
String outputFile="D:\\test.out";;
-
SparkConf conf = new SparkConf().setMaster(master).setAppName("wordCount");
-
JavaSparkContext sc = new JavaSparkContext(conf);
-
JavaRDD<String> input = sc.textFile(inputFile);
-
-
//共享变量累加器
-
final Accumulator<Integer> blankLines = sc.accumulator(0);
-
-
//共享变量广播
-
ArrayList<String> list = new ArrayList<String>(){{add("a");add("t");}};
-
final Broadcast<Object[]> broadcast = sc.broadcast(list.toArray());
-
JavaRDD<String> wordList = input.flatMap(new FlatMapFunction<String, String>() {
-
@Override
-
public Iterator<String> call(String s) throws Exception {
-
//统计为t的行数
-
if(s.equals("t")){
-
blankLines.add(1);
-
}
-
-
//判断值是否在共享变量中
-
if(broadcast.value().toString().contains(s)){
-
System.out.println("this is broadcast value:" + s);
-
}
-
//读取每行数据 并按空格分隔
-
return Arrays.asList(s.split(" ")).iterator();
-
}
-
});
-
-
-
JavaPairRDD<String, Integer> countList = wordList.mapToPair(new PairFunction<String, String, Integer>() {
-
@Override
-
public Tuple2<String, Integer> call(String s) throws Exception {
-
//计数 生成key value
-
return new Tuple2<String, Integer>(s, 1);
-
}
-
}).reduceByKey(new Function2<Integer, Integer, Integer>() {
-
@Override
-
public Integer call(Integer v1, Integer v2) throws Exception {
-
//按key聚合
-
return v1+v2;
-
}
-
});
-
System.out.println(StringUtils.join(countList.collect(),","));
-
//打印累加器的值
-
System.out.println("blankLines:"+ blankLines.value());
-
countList.saveAsTextFile(outputFile);
-
}
阅读(2534) | 评论(0) | 转发(0) |