全部博文(51)
分类: HADOOP
2017-11-10 14:45:33
运行环境:hadoop2.4.1、hbase 0.98.16-hadoop2 、spark1.4.0
详细流程:
1.spark-submit提交任务到yarn 集群
2.ResourceMaster收到提交到的任务后分配资源,创建ApplicationMaster
3.Executor中的任务起动执行,向zookeeper读取消费kafka
offset
4.拿到offset后,向kafka集群取数据
5.处理数据
6.处理结果保存至HBase
7.处理成功后,提交offset至zookeeper
关于spark on
yarn执行流程:
代码部分比较简单,SparkStreaming是程序入口,根据传入的topic,调用相就的解析器,如NewTvadpv等,都是与topic名称相符 ,解析器会按不同日志规则解析相应的字段,并入HBase。SparkStreaming中配置spark任务参数,如kafka broker list, zookeeper ,及提交offset。
为了方便日后维护,避免数据不连续,需要控制客户端消费kafka offset,所以引入了KafkaManager,并且调用高效的底层api directStream。KafkaManager封装了一层KafkaUtils.createDirectStream方法,在拉取数据前,会设置此topic、groupId 在zookeeper上保存的各分区的消费offset位置。如果此groupid,之前没有消费过,则从最新的数据开始。如果有消费过,且此offset大于kakfa现有最小offset,此从此位置开始消费,否则从kafka中,现有最小的offset开始消费。
在10.16.34.80 运行/opt/bd-warehouse/spark/run.sh 脚本提交8个topic任务
单个任务提交可用如下命令:
topic="tvadfpv"
maxRate=1000
base_home="/opt/bd-warehouse/spark"
nohup /usr/lib/spark/bin/spark-submit
--master yarn-cluster --principal=bd-warehouse/10.16.34.80@HERACLES.SOHUNO.COM
--keytab=/home/bd-warehouse/bd-warehouse.keytab --conf
spark.hadoop.fs.hdfs.impl.disable.cache=true --queue datacenter --class
com.sohu.spark.SparkStreaming --driver-memory 4g --executor-memory 4G
--total-executor-cores 16 ${base_home}/libs/rt-service-1.0-SNAPSHOT.jar ${topic}
${maxRate} >> ${base_home}/logs/${topic}.log 2>&1 &
说明:
--master
yarn-cluster 以yarn cluster模式提交,在yarn集群上运行
由于运行的是spark
streaming,相当于yarn的长任务,所以,需要配置以下三项参数。否则,任务运行7天,TOCKEN超期,导致任务失败。
--principal=bd-warehouse/10.16.34.80@HERACLES.SOHUNO.COM
--keytab=/home/bd-warehouse/bd-warehouse.keytab
--conf
spark.hadoop.fs.hdfs.impl.disable.cache=true
tvadfpv 是程序参数,指从kafka读取数据的topic
5000 设置spark.streaming.kafka.maxRatePerPartition 值,每秒每个partition可处理记录数 tvadfpv为一个分区,且是pv的,量比较大,所以此时值设置为5000,其它topic,如newtvadpv量最大,且为18个分区,值设置为 1500以下,太大会影响处理时间,导致处理延迟越来越严重。
由于在前期上线,任务经常有跑死的现象,所以,监控是必不可少了。Spark Streaming在Yarn上是一个长任务,需要一直运行。目前的监控方法是,使用YARN的API,获取application的状态信息,查看任务是否在Running状态,如果不是,则发出报警。
部分重要代码片断及说明如下:
调用方法如下:
nohup hadoop jar /opt/bd-warehouse/spark/libs/rt-service-1.0-SNAPSHOT-jar-with-dependencies.jar
yarn.YarnAplicationMonitor ${topic} ${appid} >>/opt/bd-warehouse/spark/
monitor/logs/${topic}-monitor.log 2>&1 &
topic名称
appid 类似application_1428668992862_7651596 为应用任务id,可以日志中查看到
数据存入HBase 表BD_WH:REAL_TIME_DATA, 列簇名为:cf
如下:
get
'BD_WH:REAL_TIME_DATA','20160312','cf:click' 获取'20160312' 当天的点击数
get 'BD_WH:REAL_TIME_DATA','20160312','cf:exec' 获取'20160312' 当天的完成数
scan 'BD_WH:REAL_TIME_DATA',{FILTER=>"PrefixFilter('331300|201603281223')"}
获取排期包331300 在201603281223分点击完成:
其中,cf:click-30403 为点击,cf:exec-30403为完成量,30403为平台编码。
二、程序结构:
三、部署上线
四、任务监控
五、结果数据结构