Chinaunix首页 | 论坛 | 博客
  • 博客访问: 152205
  • 博文数量: 27
  • 博客积分: 531
  • 博客等级: 一等列兵
  • 技术积分: 332
  • 用 户 组: 普通用户
  • 注册时间: 2012-02-25 18:31
文章分类

全部博文(27)

文章存档

2015年(4)

2014年(3)

2013年(6)

2012年(14)

我的朋友

分类: 大数据

2015-10-30 15:18:39

在使用spark的时候,难免写spark程序进行数据分析。根据spark的文档,我们提应用程序的方式是使用对应的spark-submit脚本进行,但是在实际的使用中往往需要程序代码提交用于分析的应用。

查找相关文档,得到如下程序例子:

点击(此处)折叠或打开

  1. public class MyLanucher {

  2.     public static void main(String[] args) throws IOException, InterruptedException {
  3.         SparkLauncher launcher = new SparkLauncher();
  4.         launcher.setAppResource("count.jar"); // 要启动的spark应用包
  5.         launcher.setMainClass("JavaNetworkWordCount");
  6.         launcher.addAppArgs(args);
  7.         launcher.setMaster("yarn-cluster"); // 在yarn-cluster上启动,也可以再local上
  8.         launcher.setConf(SparkLauncher.DRIVER_MEMORY, "512m");
  9.         launcher.setConf(SparkLauncher.EXECUTOR_MEMORY, "512m");
  10.         launcher.setConf(SparkLauncher.EXECUTOR_CORES, "4");

  11.         Process process = launcher.launch();

  12.         InputStream stdInput = process.getInputStream();
  13.         InputStream errInput = process.getErrorStream();

  14.         process.waitFor();

  15.         // 获取dirver进程的输出
  16.         System.out.println("---------------- read msg -----------------");
  17.         dumpInput(stdInput);

  18.         System.out.println("-------------- read err msg ---------------");
  19.         dumpInput(errInput);

  20.         System.out.println("launcher over");
  21.     }

  22.     private static void dumpInput(InputStream input) throws IOException {
  23.         byte[] buff = new byte[1024];

  24.         while (true) {
  25.             int len = input.read(buff);

  26.             if (len < 0) {
  27.                 break;
  28.             }

  29.             System.out.println(new String(buff, 0, len));
  30.         }
  31.     }

  32. }

编译运行该代码:

点击(此处)折叠或打开

  1. * 1. 环境变量:SPARK_HOME=/home/longlong/workspace/spark/spark-1.5.0-bin-hadoop2.6
  2. * 2. 环境变量:YARN_CONF_DIR=/home/longlong/workspace/hadoop-2.7.1/etc/hadoop
  3. * 3. 编译:javac -cp spark-assembly-1.5.0-hadoop2.6.0.jar MyLanucher.java
  4. * 4. 打包:jar -cf launcher.jar MyLanucher.class
  5. * 5. 启动:java -cp spark-assembly-1.5.0-hadoop2.6.0.jar:launcher.jar MyLanucher ip port
该代码启动一个spark应用,在yarn集群上,对应的应用的jar包名字为counter.jar

counter的代码如下:

点击(此处)折叠或打开

  1. public final class JavaNetworkWordCount {
  2.     private static final Pattern SPACE = Pattern.compile(" ");

  3.     @SuppressWarnings({ "serial", "resource" })
  4.     public static void main(String[] args) {
  5.         if (args.length < 2) {
  6.             System.err.println("Usage: JavaNetworkWordCount ");
  7.             System.exit(1);
  8.         }

  9.         // Create the context with a 1 second batch size
  10.         SparkConf sparkConf = new SparkConf().setAppName("JavaNetworkWordCount");
  11.         JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, Durations.seconds(1));

  12.         // Create a JavaReceiverInputDStream on target ip:port and count the
  13.         // words in input stream of \n delimited text (eg. generated by 'nc')
  14.         // Note that no duplication in storage level only for running locally.
  15.         // Replication necessary in distributed scenario for fault tolerance.
  16.         JavaReceiverInputDStream<String> lines = ssc.socketTextStream(args[0], Integer.parseInt(args[1]),
  17.                 StorageLevels.MEMORY_AND_DISK_SER);
  18.         JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
  19.             @Override
  20.             public Iterable<String> call(String x) {
  21.                 return Arrays.asList(SPACE.split(x));
  22.             }
  23.         });
  24.         JavaPairDStream<String, Integer> wordCounts = words.mapToPair(new PairFunction<String, String, Integer>() {
  25.             @Override
  26.             public Tuple2<String, Integer> call(String s) {
  27.                 return new Tuple2<String, Integer>(s, 1);
  28.             }
  29.         }).reduceByKey(new Function2<Integer, Integer, Integer>() {
  30.             @Override
  31.             public Integer call(Integer i1, Integer i2) {
  32.                 return i1 + i2;
  33.             }
  34.         });

  35.         wordCounts.print();
  36.         ssc.start();
  37.         ssc.awaitTermination();
  38.     }
  39. }

该例子来自于spark自带的example


阅读(1864) | 评论(0) | 转发(0) |
0

上一篇:一段shell代码的解释

下一篇:没有了

给主人留下些什么吧!~~