Chinaunix首页 | 论坛 | 博客
  • 博客访问: 74654
  • 博文数量: 29
  • 博客积分: 0
  • 博客等级: 民兵
  • 技术积分: 272
  • 用 户 组: 普通用户
  • 注册时间: 2015-01-05 20:32
文章分类

全部博文(29)

文章存档

2016年(2)

2015年(27)

我的朋友

分类: 大数据

2016-06-04 11:23:52

目的:flume从数据源取数据,并传给spark streaming,spark处理完后直接写入Elastic search。
1,======================环境=========================
spark-1.5.0
elasticsearch-1.7.2
java-1.7.0
flume-1.6.0

2,==================依赖jar包 ==========================
spark-streaming-flume-sink_2.10-1.3.0.jar
scala-library-2.10.4.jar
spark-assembly-1.3.0-cdh5.4.8-hadoop2.6.0-cdh5.4.8.jar(不加载会报找不到类的错误)
将上面的三个jar包放到$flume_home/lib下(版本可换成1.5的)

Elasticsearch Hadoop ? 2.1.0.Beta4(在编译时最好先用mvn命令行把jar加入本地库,否则编译时特别费劲)
Elasticsearch Hadoop ? 2.1.0.Beta4的路径加到/etc/spark/conf/classpath.txt中
(官方文档)


3,================配置和启动flume=========================
A,配置:source和channel的配置可随意(channel如果配成memory的话会报channel已满的错误),
关键是sink的配置,如下
#===============spark sink==============
a1.sinks.k1.type = org.apache.spark.streaming.flume.sink.SparkSink
a1.sinks.k1.hostname = 10.1.60.132
a1.sinks.k1.port = 4401
注意spark-streaming-flume-sink_2.10-1.3.0.jar一定要下载正确否则会报没有SparkSink的错误
B,启动:/opt/apache-flume-1.6.0-bin/bin/flume-ng agent  -c conf  -f /opt/apache-flume-1.6.0-bin/conf/pull_spark_streaming.conf  -n a1  -Dflume.root.logger=INFO,console >log.txt

4,================spark处理数据并写入ES=========================
概要:spark streaming从flume中第隔一段时间(时间长短由自己定)会收到一批数据(rdd),如何处理rdd中的数据,并将
      rdd转换成带有列名的DataFrame是关键。
A,从spark streaming从flume中取数据
       val flumeStream = FlumeUtils.createPollingStream(ssc, "10.1.60.132", 4401)    #ssc是streaming context,后面的flume sink配置的ip和port
        val mappedlines = flumeStream.map{flumeEvent=>
          val event = flumeEvent.event
          val messageBody = new String(event.getBody.array())
          messageBody}        #messageBody就是flume发送的数据

B,rdd到DataFrame(先切割,后转换成DataFrame)
          val indata = rdd.map( x => x.split("\001") )
          val esin = indata.map{p => Person(p(0),p(1), p(2), p(3), p(4), p(5), p(6), p(7), p(8), p(9), p(10), p(11), p(12), p(13), p(14), p(15), p(16), p(17), p(18), p(19), p(20), p(21))}.toDF()    #person要定义到main函数外面
         #上面是通过case class建立schema的,如果字段太长的话,这种方式就有点笨了。spark提供另一种较省事的办法【参见第二份abc.scala】
           

C,写入ES
esin.saveToEs(Map(ES_RESOURCE_WRITE->"spark/people",ES_NODES->"10.1.80.171"))
     
D,启动spark
spark-submit --master yarn-client --name t1 --class flumeStreaming.abc --jars streaming_flume_2-0.0.1-SNAPSHOT.jar  flumeStreaming_1.0-1.0.0.jar

5,=====================结果============================
1,成功向ES中灌入63万条交易数据
2,灌入数据的index/type之前就有数据,而且灌入数据的字段与原来的字段不同。
      充分证明ES数据库中字段的高可扩展性。

参考:
1,  
2,   spark-sql:查询临时表时异常(错误No Typetag)
3,(经典)
阅读(1419) | 评论(1) | 转发(0) |
0

上一篇:learning apache kafka读书笔记

下一篇:没有了

给主人留下些什么吧!~~

jiedushi2016-11-30 21:08:44

请问博主   spark提供另一种较省事的办法【参见第二份abc.scala】链接在哪里可以看到呢?