Chinaunix首页 | 论坛 | 博客
  • 博客访问: 469247
  • 博文数量: 153
  • 博客积分: 0
  • 博客等级: 民兵
  • 技术积分: 1575
  • 用 户 组: 普通用户
  • 注册时间: 2016-12-20 17:02
文章分类

全部博文(153)

文章存档

2017年(111)

2016年(42)

我的朋友

分类: 系统运维

2016-12-27 16:33:21

1 在线学习

模型随着接收的新消息,不断更新自己;而不是像离线训练一次次重新训练。

2 Spark Streaming

  • 离散化流(DStream)
  • 输入源:Akka actors、消息队列、Flume、Kafka、……

  • 类群(lineage):应用到RDD上的转换算子和执行算子的集合

3 MLib+Streaming应用

3.0 build.sbt

依赖Spark MLlib和Spark Streaming

name := "scala-spark-streaming-app" version := "1.0" scalaVersion := "2.11.7" libraryDependencies += "org.apache.spark" %% "spark-mllib" % "1.5.1" libraryDependencies += "org.apache.spark" %% "spark-streaming" % "1.5.1" 
使用国内镜像仓库

~/.sbt/repositories

[repositories]
local
osc: 
typesafe:  [organization]/[module]/(scala_[scalaVersion]/)(sbt_[sbtVersion]/)[revision]/[type]s/[artifact](-[classifier]).[ext], bootOnly
sonatype-oss-releases
maven-central
sonatype-oss-snapshots 

3.1 生产消息

object StreamingProducer { def main(args: Array[String]) { val random = new Random() // Maximum number of events per second val MaxEvents = 6 // Read the list of possible names val namesResource = this.getClass.getResourceAsStream("/names.csv") val names = scala.io.Source.fromInputStream(namesResource)
      .getLines()
      .toList
      .head
      .split(",")
      .toSeq // Generate a sequence of possible products val products = Seq( "iPhone Cover" -> 9.99, "Headphones" -> 5.49, "Samsung Galaxy Cover" -> 8.95, "iPad Cover" -> 7.49 ) /** Generate a number of random product events */ def generateProductEvents(n: Int) = {
      (1 to n).map { i => val (product, price) = products(random.nextInt(products.size)) val user = random.shuffle(names).head
        (user, product, price)
      }
    } // create a network producer val listener = new ServerSocket(9999)
    println("Listening on port: 9999") while (true) { val socket = listener.accept() new Thread() { override def run = {
          println("Got client connected from: " + socket.getInetAddress) val out = new PrintWriter(socket.getOutputStream(), true) while (true) { Thread.sleep(1000) val num = random.nextInt(MaxEvents) val productEvents = generateProductEvents(num)
            productEvents.foreach{ event =>
              out.write(event.productIterator.mkString(","))
              out.write("\n")
            }
            out.flush()
            println(s"Created $num events...")
          }
          socket.close()
        }
      }.start()
    }
  }
} 
sbt run

Multiple main classes detected, select one to run: [1] MonitoringStreamingModel [2] SimpleStreamingApp [3] SimpleStreamingModel [4] StreamingAnalyticsApp [5] StreamingModelProducer [6] StreamingProducer [7] StreamingStateApp

Enter number: 6 

3.2 打印消息

阅读全文请点击:
阅读(1864) | 评论(0) | 转发(0) |
给主人留下些什么吧!~~