目的:flume从数据源取数据,并传给spark streaming,spark处理完后直接写入Elastic search。
1,======================环境=========================
spark-1.5.0
elasticsearch-1.7.2
java-1.7.0
flume-1.6.0
2,==================依赖jar包 ==========================
spark-streaming-flume-sink_2.10-1.3.0.jar
scala-library-2.10.4.jar
spark-assembly-1.3.0-cdh5.4.8-hadoop2.6.0-cdh5.4.8.jar(不加载会报找不到类的错误)
将上面的三个jar包放到$flume_home/lib下(版本可换成1.5的)
Elasticsearch Hadoop ? 2.1.0.Beta4(在编译时最好先用mvn命令行把jar加入本地库,否则编译时特别费劲)
将Elasticsearch Hadoop ? 2.1.0.Beta4的路径加到/etc/spark/conf/classpath.txt中
3,================配置和启动flume=========================
A,配置:source和channel的配置可随意(channel如果配成memory的话会报channel已满的错误),
关键是sink的配置,如下
#===============spark sink==============
a1.sinks.k1.type = org.apache.spark.streaming.flume.sink.SparkSink
a1.sinks.k1.hostname = 10.1.60.132
a1.sinks.k1.port = 4401
注意:spark-streaming-flume-sink_2.10-1.3.0.jar一定要下载正确否则会报没有SparkSink的错误
B,启动:/opt/apache-flume-1.6.0-bin/bin/flume-ng agent -c conf -f /opt/apache-flume-1.6.0-bin/conf/pull_spark_streaming.conf -n a1 -Dflume.root.logger=INFO,console >log.txt
4,================spark处理数据并写入ES=========================
概要:spark streaming从flume中第隔一段时间(时间长短由自己定)会收到一批数据(rdd),如何处理rdd中的数据,并将
rdd转换成带有列名的DataFrame是关键。
A,从spark streaming从flume中取数据
val flumeStream = FlumeUtils.createPollingStream(ssc, "10.1.60.132", 4401) #ssc是streaming context,后面的flume sink配置的ip和port
val mappedlines = flumeStream.map{flumeEvent=>
val event = flumeEvent.event
val messageBody = new String(event.getBody.array())
messageBody} #messageBody就是flume发送的数据
B,rdd到DataFrame(先切割,后转换成DataFrame)
val indata = rdd.map( x => x.split("\001") )
val esin = indata.map{p => Person(p(0),p(1), p(2), p(3), p(4), p(5), p(6), p(7), p(8), p(9), p(10), p(11), p(12), p(13), p(14), p(15), p(16), p(17), p(18), p(19), p(20), p(21))}.toDF() #person要定义到main函数外面
#上面是通过case class建立schema的,如果字段太长的话,这种方式就有点笨了。spark提供另一种较省事的办法【参见第二份abc.scala】
C,写入ES
esin.saveToEs(Map(ES_RESOURCE_WRITE->"spark/people",ES_NODES->"10.1.80.171"))
D,启动spark
spark-submit --master yarn-client --name t1 --class flumeStreaming.abc --jars streaming_flume_2-0.0.1-SNAPSHOT.jar flumeStreaming_1.0-1.0.0.jar
5,=====================结果============================
1,成功向ES中灌入63万条交易数据
2,灌入数据的index/type之前就有数据,而且灌入数据的字段与原来的字段不同。
充分证明ES数据库中字段的高可扩展性。
参考:
1,
2, spark-sql:查询临时表时异常(错误No Typetag)
阅读(1407) | 评论(1) | 转发(0) |