Chinaunix首页 | 论坛 | 博客
  • 博客访问: 1115158
  • 博文数量: 143
  • 博客积分: 969
  • 博客等级: 准尉
  • 技术积分: 1765
  • 用 户 组: 普通用户
  • 注册时间: 2011-07-30 12:09
文章分类

全部博文(143)

文章存档

2023年(4)

2021年(2)

2020年(4)

2019年(4)

2018年(33)

2017年(6)

2016年(13)

2014年(7)

2013年(23)

2012年(33)

2011年(14)

我的朋友

分类: 大数据

2018-07-26 22:37:07

学习spark之后,第一次实践操作写个wordCount,在实践中遇到问题

1、开发环境和服务器运行环使用的spark类库版本不一致达,导致在本地开发环境运行正常,但提交服务器运行报错,需要开发环境和运行环境spark版本一致

2、在驱动程序中设置sparktconf 需要注意,如果环境变量设置不对,也有可能报错

具体实现如下:


点击(此处)折叠或打开

  1. public static void main(String []args){
  2.         String master=args[0];
  3.         String inputFile=args[1];
  4.         String outputFile=args[2];
  5.         SparkConf conf = new SparkConf().setMaster(master).setAppName("wordCount");
  6.         JavaSparkContext sc = new JavaSparkContext(conf);
  7.         JavaRDD<String> input = sc.textFile(inputFile);

  8.         JavaRDD<String> wordList = input.flatMap(new FlatMapFunction<String, String>() {
  9.             @Override
  10.             public Iterator<String> call(String s) throws Exception {
  11.                 //读取每行数据 并按空格分隔
  12.                 return Arrays.asList(s.split(" ")).iterator();
  13.             }
  14.         });

  15.        JavaPairRDD<String, Integer> countList = wordList.mapToPair(new PairFunction<String, String, Integer>() {
  16.            @Override
  17.            public Tuple2<String, Integer> call(String s) throws Exception {
  18.                //计数 生成key value
  19.                return new Tuple2<String, Integer>(s, 1);
  20.            }
  21.        }).reduceByKey(new Function2<Integer, Integer, Integer>() {
  22.            @Override
  23.            public Integer call(Integer v1, Integer v2) throws Exception {
  24.                //按key聚合
  25.                return v1+v2;
  26.            }
  27.        });
  28.         System.out.println(StringUtils.join(countList.collect(),","));
  29.         countList.saveAsTextFile(outputFile);
  30.     }



阅读(1134) | 评论(0) | 转发(0) |
给主人留下些什么吧!~~