Chinaunix首页 | 论坛 | 博客
  • 博客访问: 167893
  • 博文数量: 51
  • 博客积分: 0
  • 博客等级: 民兵
  • 技术积分: 471
  • 用 户 组: 普通用户
  • 注册时间: 2015-05-11 10:24
文章分类

全部博文(51)

文章存档

2018年(3)

2017年(22)

2016年(9)

2015年(17)

我的朋友

分类: HADOOP

2017-11-10 14:45:33

一、rt-service

运行环境hadoop2.4.1hbase 0.98.16-hadoop2 spark1.4.0

详细流程:


1.spark-submit提交任务到yarn 集群

2.ResourceMaster收到提交到的任务后分配资源,创建ApplicationMaster

3.Executor中的任务起动执行,向zookeeper读取消费kafka offset

4.拿到offset后,向kafka集群取数据

5.处理数据

6.处理结果保存至HBase

7.处理成功后,提交offsetzookeeper

 

关于spark on yarn执行流程:


二、程序结构:

代码部分比较简单,SparkStreaming是程序入口,根据传入的topic,调用相就的解析器,如NewTvadpv等,都是与topic名称相符 ,解析器会按不同日志规则解析相应的字段,并入HBaseSparkStreaming中配置spark任务参数,如kafka broker list, zookeeper ,及提交offset

     为了方便日后维护,避免数据不连续,需要控制客户端消费kafka offset,所以引入了KafkaManager,并且调用高效的底层api directStreamKafkaManager封装了一层KafkaUtils.createDirectStream方法,在拉取数据前,会设置此topicgroupId zookeeper上保存的各分区的消费offset位置。如果此groupid,之前没有消费过,则从最新的数据开始。如果有消费过,且此offset大于kakfa现有最小offset,此从此位置开始消费,否则从kafka中,现有最小的offset开始消费。

 

 

三、部署上线

  10.16.34.80 运行/opt/bd-warehouse/spark/run.sh 脚本提交8topic任务

单个任务提交可用如下命令:

 

topic="tvadfpv"

maxRate=1000

base_home="/opt/bd-warehouse/spark"

 

nohup /usr/lib/spark/bin/spark-submit --master yarn-cluster --principal=bd-warehouse/10.16.34.80@HERACLES.SOHUNO.COM --keytab=/home/bd-warehouse/bd-warehouse.keytab --conf spark.hadoop.fs.hdfs.impl.disable.cache=true --queue datacenter --class com.sohu.spark.SparkStreaming --driver-memory 4g --executor-memory 4G --total-executor-cores 16 ${base_home}/libs/rt-service-1.0-SNAPSHOT.jar ${topic} ${maxRate} >> ${base_home}/logs/${topic}.log 2>&1 &

 

说明:

   --master  yarn-cluster yarn cluster模式提交,在yarn集群上运行

   由于运行的是spark streaming,相当于yarn的长任务,所以,需要配置以下三项参数。否则,任务运行7天,TOCKEN超期,导致任务失败。

   --principal=bd-warehouse/10.16.34.80@HERACLES.SOHUNO.COM

--keytab=/home/bd-warehouse/bd-warehouse.keytab

--conf spark.hadoop.fs.hdfs.impl.disable.cache=true

 

tvadfpv  是程序参数,指从kafka读取数据的topic

5000    设置spark.streaming.kafka.maxRatePerPartition 值,每秒每个partition可处理记录数 tvadfpv为一个分区,且是pv的,量比较大,所以此时值设置为5000,其它topic,如newtvadpv量最大,且为18个分区,值设置为 1500以下,太大会影响处理时间,导致处理延迟越来越严重。

 

四、任务监控

由于在前期上线,任务经常有跑死的现象,所以,监控是必不可少了。Spark StreamingYarn上是一个长任务,需要一直运行。目前的监控方法是,使用YARNAPI,获取application的状态信息,查看任务是否在Running状态,如果不是,则发出报警。

部分重要代码片断及说明如下:

调用方法如下:

 

nohup hadoop jar /opt/bd-warehouse/spark/libs/rt-service-1.0-SNAPSHOT-jar-with-dependencies.jar yarn.YarnAplicationMonitor ${topic} ${appid} >>/opt/bd-warehouse/spark/

monitor/logs/${topic}-monitor.log  2>&1 &

topic名称

appid  类似application_1428668992862_7651596 为应用任务id,可以日志中查看到

五、结果数据结构

数据存入HBase BD_WH:REAL_TIME_DATA, 列簇名为:cf

如下:

get 'BD_WH:REAL_TIME_DATA','20160312','cf:click'  获取'20160312' 当天的点击数

get 'BD_WH:REAL_TIME_DATA','20160312','cf:exec'  获取'20160312' 当天的完成数

scan 'BD_WH:REAL_TIME_DATA',{FILTER=>"PrefixFilter('331300|201603281223')"}

获取排期包331300 201603281223分点击完成:

其中,cf:click-30403 为点击,cf:exec-30403为完成量,30403为平台编码。

阅读(863) | 评论(0) | 转发(0) |
给主人留下些什么吧!~~