在使用spark的时候,难免写spark程序进行数据分析。根据spark的文档,我们提应用程序的方式是使用对应的spark-submit脚本进行,但是在实际的使用中往往需要程序代码提交用于分析的应用。
查找相关文档,得到如下程序例子:
-
public class MyLanucher {
-
-
public static void main(String[] args) throws IOException, InterruptedException {
-
SparkLauncher launcher = new SparkLauncher();
-
launcher.setAppResource("count.jar"); // 要启动的spark应用包
-
launcher.setMainClass("JavaNetworkWordCount");
-
launcher.addAppArgs(args);
-
launcher.setMaster("yarn-cluster"); // 在yarn-cluster上启动,也可以再local上
-
launcher.setConf(SparkLauncher.DRIVER_MEMORY, "512m");
-
launcher.setConf(SparkLauncher.EXECUTOR_MEMORY, "512m");
-
launcher.setConf(SparkLauncher.EXECUTOR_CORES, "4");
-
-
Process process = launcher.launch();
-
-
InputStream stdInput = process.getInputStream();
-
InputStream errInput = process.getErrorStream();
-
-
process.waitFor();
-
-
// 获取dirver进程的输出
-
System.out.println("---------------- read msg -----------------");
-
dumpInput(stdInput);
-
-
System.out.println("-------------- read err msg ---------------");
-
dumpInput(errInput);
-
-
System.out.println("launcher over");
-
}
-
-
private static void dumpInput(InputStream input) throws IOException {
-
byte[] buff = new byte[1024];
-
-
while (true) {
-
int len = input.read(buff);
-
-
if (len < 0) {
-
break;
-
}
-
-
System.out.println(new String(buff, 0, len));
-
}
-
}
-
-
}
编译运行该代码:
-
* 1. 环境变量:SPARK_HOME=/home/longlong/workspace/spark/spark-1.5.0-bin-hadoop2.6
-
* 2. 环境变量:YARN_CONF_DIR=/home/longlong/workspace/hadoop-2.7.1/etc/hadoop
-
* 3. 编译:javac -cp spark-assembly-1.5.0-hadoop2.6.0.jar MyLanucher.java
-
* 4. 打包:jar -cf launcher.jar MyLanucher.class
-
* 5. 启动:java -cp spark-assembly-1.5.0-hadoop2.6.0.jar:launcher.jar MyLanucher ip port
该代码启动一个spark应用,在yarn集群上,对应的应用的jar包名字为counter.jar
counter的代码如下:
-
public final class JavaNetworkWordCount {
-
private static final Pattern SPACE = Pattern.compile(" ");
-
-
@SuppressWarnings({ "serial", "resource" })
-
public static void main(String[] args) {
-
if (args.length < 2) {
-
System.err.println("Usage: JavaNetworkWordCount ");
-
System.exit(1);
-
}
-
-
// Create the context with a 1 second batch size
-
SparkConf sparkConf = new SparkConf().setAppName("JavaNetworkWordCount");
-
JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, Durations.seconds(1));
-
-
// Create a JavaReceiverInputDStream on target ip:port and count the
-
// words in input stream of \n delimited text (eg. generated by 'nc')
-
// Note that no duplication in storage level only for running locally.
-
// Replication necessary in distributed scenario for fault tolerance.
-
JavaReceiverInputDStream<String> lines = ssc.socketTextStream(args[0], Integer.parseInt(args[1]),
-
StorageLevels.MEMORY_AND_DISK_SER);
-
JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
-
@Override
-
public Iterable<String> call(String x) {
-
return Arrays.asList(SPACE.split(x));
-
}
-
});
-
JavaPairDStream<String, Integer> wordCounts = words.mapToPair(new PairFunction<String, String, Integer>() {
-
@Override
-
public Tuple2<String, Integer> call(String s) {
-
return new Tuple2<String, Integer>(s, 1);
-
}
-
}).reduceByKey(new Function2<Integer, Integer, Integer>() {
-
@Override
-
public Integer call(Integer i1, Integer i2) {
-
return i1 + i2;
-
}
-
});
-
-
wordCounts.print();
-
ssc.start();
-
ssc.awaitTermination();
-
}
-
}
该例子来自于spark自带的example
阅读(1862) | 评论(0) | 转发(0) |