最近一直再整逻辑回归在Spark上的实现,本文主要是对Spark上编程的总结,并给出是如何解决过拟合现象。至于逻辑回归,网上的资料一搜一大把。
开始以为,在Spark上编程就是使用scala语言,其它的也没什么不同。但当实现起来确发现真不是那样的。为了提高程序的运行速度,必须借用Spark的RDD结构等特性。如果我们只用scala进行编程,那样就不能充分体现Spark迭代的高效性,经过测试发现16G的数据样本集二百万的特征集时,如果纯用scala那么更新一次权重需要大约7分多钟的时间,如果充分利用Spark的特性(例如,cache等)大约只需20来秒的时间。在Spark上进行编程,给我的最大感触就是各种变换,通过变换得到我们想要的一组向量。这与我之前的编程经历是完全不同的,原来编程时用的是C/C++、Java和Python等,这些语言虽然各有不同,但写起来也都大同小异没什么新奇的然而,在Spark上写程序就不一样啦,虽然Spark是基于scala的,但给人带来的变成观念是完全不同的。Spark主要是通过各种变换,也就是各种transformation得到自己想要的数据,然后利用action得到自己想要的向量,总之在Spark上编程不再是通过“一步步”的计算来得到自己想要的数据而是通过一系列的变换得到自己想要的结果。下面先来看一段代码,是LR模型的梯度下降算法的实现:
-
def sigmoid(xi: Array[Int], w: Array[Double]):Double = {
-
var sum = 0.0
-
-
xi.foreach{i => sum += w(i)}
-
-
if(sum > 35.0)
-
sum = 35.0
-
if(sum < -35.0)
-
sum = -35.0
-
1.0 / (1.0 + math.exp(-sum))
-
}
-
-
/*迭代求权重*/
-
for(i <- 0 until iter){
-
var h = input.map{ case (label,indices) =>
-
(label,indices,sigmoid(indices,weight))
-
}
-
val loss = h.map{ case (label,indices,hh) =>
-
label*math.log(hh)+(1.0-label)*math.log(1.0-hh)
-
}
-
val sum = loss.reduce(_+_)
-
val num = loss.count
-
//loss.collect.foreach(ll => logger.error(ll))
-
logger.error("loss rate: ")
-
-
logger.error(sum)
-
logger.error(num)
-
logger.error(-sum/num)
-
-
//gradient
-
h.map{ case (label,indices,hh) =>
-
(indices,label - hh)
-
}.map{case (indices,e) => indices.map{f => (f,alpha*e)}.toMap
-
}.reduce{case (map1,map2) => map1 ++ map2.map{ case (k,v) => (k,v + map1.getOrElse(k,0.0)) }}
-
.foreach{case(k,v)=>weight(k) += v}
-
}
-
weight.foreach{w => logger.error(w)}
-
weight
-
}
上边几行主要代码是最速下降算法的主要实现部分,可以看到在更新权重时,主要是以下几个部分:(label,indices)=>(label,idices,h)=>(indices,error)=>toMap,最后是合并Map得到我们想要的结果,这个给人的感觉就是充分体现了Spark的变换思想。这几行大麦存在一些问题,就是当特征比较大时会出现过拟合现象。也就是在求loss(15~28)时,出现不正常的结果。为了解决这个问题,尝试过加惩罚函数,但结果不是怎么好。于是采用了分段随机梯度法,大致思想如下:先对数据随机的分成若干段(例如10段),然后对每段实施随机梯度算法,最后对每段求出的权重去均值,即一次全迭代后的权重值,这样的效果相当好。之所以要分段,是为了提高程序的并发性,以便更快地执行,最终的loss值保持在0.48左右,之所以不能再有所提高,是因为我们只实现了步长更新,并没有考虑方向问题。下面是大致代码:
-
def splitSamples(input:RDD[(Double, Array[Int])], splitNum:Int=10, iterNum:Int=2){
-
val logger = Logger.getRootLogger()
-
var splitWeight = new Array[Map[Int,Double]](splitNum)
-
var ww = new Array[Double](1861181)
-
for(j <- 0 until iterNum){
-
//calculate loss
-
val loss = input.map{case (label,indices) => (label,indices,sigmoid(indices,ww))}
-
.map{case (label,indices,hh) => label*math.log(hh)+(1.0-label)*math.log(1.0-hh)}
-
val sum = loss.reduce(_+_)
-
val num = loss.count
-
logger.error("loss rate: ")
-
-
logger.error(sum)
-
logger.error(num)
-
logger.error(-sum/num)
-
-
for(i <- 0 until splitNum){
-
var j = 0
-
train(input.map{case (label,indices) =>
-
j += 1
-
(j,label,indices)
-
}.filter{case(num,label,indices) => num % splitNum == i}
-
.map{case(num,label,indices) => (label,indices)},ww).foreach{case(k,v)=>ww(k) += v/splitNum}
-
}
-
}
-
}
-
-
def sigmoid(xi: Array[Int], w: Array[Double]):Double = {
-
var sum = 0.0
-
-
xi.foreach{i => sum += w(i)}
-
-
if(sum > 35.0)
-
sum = 35.0
-
if(sum < -35.0)
-
sum = -35.0
-
1.0 / (1.0 + math.exp(-sum))
-
}
-
-
def train(input:RDD[(Double, Array[Int])], weight:Array[Double], alpha:Double=0.001):Map[Int,Double]={
-
val rst = input.map{case (label,indices) =>
-
indices.map{ arr =>
-
weight(arr) += alpha*(label-sigmoid(indices,weight))
-
(arr,weight(arr))
-
}.toMap
-
}.reduce(_++_)
-
rst
-
}
-
}
样本格式是和MLib一样的。
总结一下,在Spark上变成遇到的主要问题是变成理念的不同。其中遇到的一个主要问题就是:把一个全局或局部变量加到RDD的transformation中,最转换的过程中变量值是改变的,但在transformation结束后,本机代码却显示没有改变。这个可能是因为Spark对其进行了复制并把其分发给各个主机,导致的。
本文出自:
http://blog.chinaunix.net/uid/28311809/abstract/1.html
阅读(12739) | 评论(0) | 转发(0) |