2017年(16)
分类: 大数据
2017-03-10 17:55:32
版权申明:转载请注明出处。
文章来源:
实际生产中会有许多应用到实时处理的场景,比如:实时监测页面点击,实时监测系统异常,实时监测来自于外部的攻击。针对这些场景,twitter研发了实时数据处理工具storm,并在后来开源。spark针对这些场景设计了spark-streaming实时计算模型,它允许用户使用一系列批处理的API去处理实时数据,能做到代码逻辑的重复使用。
和spark中的rdd非常相似,spark-streaming中使用离散化流(discretized stream)作为抽象的表示,叫做DStream。它是随时间推移而收集数据的序列,每个时间段收集到的数据在DStream内部以一个RDD的形式存在。DStream支持从kafka,flume,hdfs,s3等获取输入。DStream也支持两种操作,即转化操作和输出操作(区别于RDD中的行动操作)。转化操作又分为无状态的转化操作和有状态的转化操作,无状态的转化操作有map,filter,flatmap,repartition等,是针对单个时间区间内的操作。而有状态的转化操作可以针对不同的时间区间,后面详述。
2.1 监听socket获取数据,代码如下:
这里使用nc -lk 9999 在ip为10.121.33.44的机器上发送消息
scala 17行
object SocketStream { def main(args: Array[String]): Unit = { //本地测试,设置4核 val conf = new SparkConf().setMaster("local[4]").setAppName("streaming") //以10秒为一个批次 val ssc = new StreamingContext(conf,Seconds(10)) //接收消息 val dstream = ssc.socketTextStream("10.121.33.44",9999,StorageLevel.MEMORY_AND_DISK_SER) //监测关键字error,出现则print dstream.filter(_.contains("error")).foreachRDD(rdd=>{ rdd.foreach(println(_)) }) ssc.start() ssc.awaitTermination() } }
2.2 从kafka读取数据,比较常用
scala 31行
object KafkaStream { def main(args: Array[String]): Unit = { //本地测试,设置4核 val conf = new SparkConf().setMaster("local[4]").setAppName("streaming") //以10秒为一个批次 val ssc = new StreamingContext(conf,Seconds(10)) val zkQuorum = "10.22.33.44:6688,10.22.33.45:6688/kafka_cluster" val group_id = "realtime_data" //kafka相关参数 val kafka_param = Map[String,String]( "zookeeper.connect" ->zkQuorum, "group.id" -> group_id, "zookeeper.connection.timeout.ms" -> "10000", "fetch.message.max.bytes" -> "10485760" ) val topic = Map[String,Int]("test_topic" -> 16) //接收消息 val dstream = KafkaUtils.createStream[String,String,StringDecoder,StringDecoder](ssc,kafka_param,topic,StorageLevel.MEMORY_AND_DISK_SER).map(_._2) //监测关键字error,出现则print dstream.filter(_.contains("error")).foreachRDD(rdd=>{ rdd.foreach(println(_)) }) ssc.start() ssc.awaitTermination() } }3.再来谈架构
通过上面两个例子,你可能对spark-streaming有了初步的了解,我们再来看一下它的架构。
Spark-streaming使用"微批次"的架构,把流式计算当做一系列微型的批处理操作来对待,每个时间段都产生一个RDD。如图:
作用于一个DStream上的无状态转化操作会对它其中的每个RDD生效,如针对一个输入为语句的DStream做flatMap操作的示意图如下:
4.1 无状态的转化操作。
无状态转化操作就是简单的将转化作用于DStream的每个RDD上面。下面列举了一些常见的转化操作,其中最后一个transform表示可以试用自定义的转化函数,尽管它前面已经提供了很多现成的API。
4.2有状态的转化操作。
有状态的转化操作是跨时间段的数据操作,一些先前的批次也被用来在新的批次中做计算。主要有滑动窗口和updateStateByKey。前者以一个时间段为滑动窗口进行操作,后者则用来跟踪每个键的状态变化。有状态的转化操作需要打开检查点机制来保证容错性。即:给ssc.checkpoint()设置一个检查点目录。
(1)基于窗口的转化操作会在一个比ssc设置的更长的时间段内,通过整合多个批次的,计算出整个大的时间窗口的结果。基于窗口的操作需要两个参数,一个是窗口时长,一个是滑动步长。这两个参数是ssc设置的时长的整数倍。下面的图表示了一个时间窗口为3,滑动步长为2的窗口转化操作。
前面提到的监测关键字error的例子,现在需要每隔20s就对前面30s有error的日志记录做计数,代码如下:
scala 34行
object KafkaStream { def main(args: Array[String]): Unit = { //本地测试,设置4核 val conf = new SparkConf().setMaster("local[4]").setAppName("streaming") //以10秒为一个批次 val ssc = new StreamingContext(conf,Seconds(10)) val zkQuorum = "10.22.33.44:6688,10.22.33.45:6688/kafka_cluster" val group_id = "realtime_data" //kafka相关参数 val kafka_param = Map[String,String]( "zookeeper.connect" ->zkQuorum, "group.id" -> group_id, "zookeeper.connection.timeout.ms" -> "10000", "fetch.message.max.bytes" -> "10485760" ) val topic = Map[String,Int]("test_topic" -> 16) //接收消息 val dstream = KafkaUtils.createStream[String,String,StringDecoder,StringDecoder](ssc,kafka_param,topic,StorageLevel.MEMORY_AND_DISK_SER) .map(_._2) //每隔20s对前30s出现error的日志做计数 val errors = dstream.window(Seconds(30),Seconds(20)) .filter(_.contains("error")) .count() errors.foreachRDD(rdd=>{ rdd.foreach(println(_)) }) ssc.start() ssc.awaitTermination() } }
(2)updateStateByKey
updateStateByKey能对键值对的数据进行不同批次间的数据计算,使用updateStateByKey,需要传入一个update函数,这个函数接收某个key最新批次对应的values,以及该key之前对应的value,按照自定义的逻辑返回一个新的value。如需要计算一个实时日志中http响应码的计数,代码如下:
scala 39行
object KafkaStream { def main(args: Array[String]): Unit = { //输出目录 val output = args(0) //本地测试,设置4核 val conf = new SparkConf().setMaster("local[4]").setAppName("streaming") //以10秒为一个批次 val ssc = new StreamingContext(conf,Seconds(10)) val zkQuorum = "10.22.33.44:6688,10.22.33.45:6688/kafka_cluster" val group_id = "realtime_data" //kafka相关参数 val kafka_param = Map[String,String]( "zookeeper.connect" ->zkQuorum, "group.id" -> group_id, "zookeeper.connection.timeout.ms" -> "10000", "fetch.message.max.bytes" -> "10485760" ) val topic = Map[String,Int]("test_topic" -> 16) //接收消息 val dstream = KafkaUtils.createStream[String,String,StringDecoder,StringDecoder](ssc,kafka_param,topic,StorageLevel.MEMORY_AND_DISK_SER).map(_._2) val rdd = dstream.map(_.split("\001")) .map(x=>(x(0),x(1).toLong)) .updateStateByKey(update) //输出 rdd.foreachRDD(_.saveAsTextFile(output)) ssc.start() ssc.awaitTermination() } //update函数 def update(new_values:Seq[Long],old_value:Option[Long]):Option[Long]={ val current_num = new_values.size val result_num = current_num + old_value.getOrElse(0L) Some(result_num) } }
(3)所有有状态转化操作
输出操作比较简单,有以下几种:
spark-streaming作业一般都要全天候不间断运行,那么作业的稳定性如何保证?主要有以下几点:
6.1 检查点机制。
其原理就是阶段性的将作业运行的数据存放到存储系统,如hdfs,s3等。当作业运行出现异常时可以从上述数据中恢复。
6.2 驱动器容错。
在创建实时计算作业的上下文时使用getOrCreate函数。代码如下:
scala 7行
val ssc = StreamingContext.getOrCreate(cp_dir,createContext ) def createContext(): StreamingContext ={ val sc = new SparkContext(conf) val ssc = new StreamingContext(sc,Seconds(10)) ssc.checkpoint(cp_dir) }
更多文章请关注微信公众号:bigdataer