hiya的技术博客hiyachen.blog.chinaunix.net

大数据、云计算、Linux操作系统、虚拟化

  • 博客访问: 4209465
  • 博文数量: 669
  • 博客积分: 10821
  • 博客等级: 上将
  • 技术积分: 11333
  • 用 户 组: 普通用户
  • 注册时间: 2005-12-02 10:41
  • 认证徽章:
个人简介

专注与操作系统相关的云计算,linux,openstack,spark, hadoop

文章分类

全部博文(669)

微信关注

IT168企业级官微



微信号:IT168qiye



系统架构师大会



微信号:SACC2013

订阅
热词专题

分类: 大数据

Python代码:
  1. #!/usr/bin/env python  
  2. # coding=utf-8  
  3. '''  
  4. 运行命令/yourpath/spark/bin/spark-submit --driver-memory 1g MovieLensALS.py movieLensDataDir personalRatingsFile  
  5. movieLensDataDir 电影评分数据集目录 比如 ml-1m/  
  6. personalRatingsFile 需要推荐的某用户的评价数据 格式参考ratings.dat  
  7.   
  8. '''  
  9. import sys  
  10. import itertools  
  11. from math import sqrt  
  12. from operator import add  
  13. from os.path import join, isfile, dirname  
  14.   
  15. from pyspark import SparkConf, SparkContext  
  16. from pyspark.mllib.recommendation import ALS  
  17.   
  18. def parseRating(line):  
  19.     """  
  20.         Parses a rating record in MovieLens format userId::movieId::rating::timestamp .  
  21.     """  
  22.     fields = line.strip().split("::")  
  23.     return long(fields[3]) % 10, (int(fields[0]), int(fields[1]), float(fields[2]))  
  24.   
  25. def parseMovie(line):  
  26.     """  
  27.         Parses a movie record in MovieLens format movieId::movieTitle .  
  28.     """  
  29.     fields = line.strip().split("::")  
  30.     return int(fields[0]), fields[1]  
  31.   
  32. def loadRatings(ratingsFile):  
  33.     """  
  34.        Load ratings from file.  
  35.     """  
  36.     if not isfile(ratingsFile):  
  37.         print "File %s does not exist." % ratingsFile  
  38.         sys.exit(1)  
  39.     f = open(ratingsFile, 'r')  
  40.     ratings = filter(lambda r: r[2] > 0, [parseRating(line)[1for line in f])  
  41.     f.close()  
  42.     if not ratings:  
  43.         print "No ratings provided."  
  44.         sys.exit(1)  
  45.     else:  
  46.         return ratings  
  47.   
  48. def computeRmse(model, data, n):  
  49.     """  
  50.         Compute RMSE (Root Mean Squared Error).  
  51.     """  
  52.     predictions = model.predictAll(data.map(lambda x: (x[0], x[1])))  
  53.     predictionsAndRatings = predictions.map(lambda x: ((x[0], x[1]), x[2])).join(data.map(lambda x: ((x[0], x[1]), x[2]))).values()  
  54.     return sqrt(predictionsAndRatings.map(lambda x: (x[0] - x[1]) ** 2).reduce(add) / float(n))  
  55.   
  56. if __name__ == "__main__":  
  57.     if (len(sys.argv) != 3):  
  58.         print "Usage: /path/to/spark/bin/spark-submit --driver-memory 1g MovieLensALS.py movieLensDataDir personalRatingsFile"  
  59.         sys.exit(1)  
  60.   
  61.     # set up environment  
  62.     conf = SparkConf().setAppName("MovieLensALS").set("spark.executor.memory""1g")  
  63.     sc = SparkContext(conf=conf)  
  64.   
  65.     # load personal ratings  
  66.     myRatings = loadRatings(sys.argv[2])  
  67.     myRatingsRDD = sc.parallelize(myRatings, 1)  
  68.   
  69.     movieLensHomeDir = sys.argv[1]  
  70.   
  71.     # ratings is an RDD of (last digit of timestamp, (userId, movieId, rating))  
  72.     ratings = sc.textFile(join(movieLensHomeDir, "ratings.dat")).map(parseRating)  
  73.   
  74.     #  movies is an RDD of (movieId, movieTitle)  
  75.     movies = dict(sc.textFile(join(movieLensHomeDir, "movies.dat")).map(parseMovie).collect())  
  76.   
  77.     numRatings = ratings.count()  
  78.     numUsers = ratings.values().map(lambda r: r[0]).distinct().count()  
  79.     numMovies = ratings.values().map(lambda r: r[1]).distinct().count()  
  80.       
  81.     myRatedMovieIds = set([x[1for x in myRatings])  
  82.   
  83.     print "Got %d ratings from %d users on %d movies." % (numRatings, numUsers, numMovies)  
  84.   
  85.      
  86.     # split ratings into train , validation   
  87.     # last digit of the timestamp, add myRatings to train, and cache them  
  88.   
  89.     # training, validation, test are all RDDs of (userId, movieId, rating)  
  90.   
  91.     numPartitions = 4  
  92.     #training = ratings.filter(lambda x: x[0] < 8).values().union(myRatingsRDD).repartition(numPartitions).cache()  
  93.   
  94.     validation = ratings.filter(lambda x: x[0] >= 8 and x[0] < 10).values().repartition(numPartitions).cache()  
  95.   
  96.   
  97.       
  98.   
  99.   
  100.     numTraining = training.count()  
  101.     numValidation = validation.count()  
  102.   
  103.     print "Training: %d, validation: %d" % (numTraining, numValidation)  
  104.   
  105.     # train models and evaluate them on the validation set  
  106.   
  107.     ranks = [10,12]  
  108.     lambdas = [0.01,0.4,1.0]  
  109.     numIters = [10]  
  110.     bestModel = None  
  111.     bestValidationRmse = float("inf")  
  112.     bestRank = 0  
  113.     bestLambda = -1.0  
  114.     bestNumIter = -1  
  115.   
  116.     for rank, lmbda, numIter in itertools.product(ranks, lambdas, numIters):  
  117.         model = ALS.train(training, rank, numIter, lmbda)  
  118.         validationRmse = computeRmse(model, validation, numValidation)  
  119.         print "RMSE (validation) = %f for the model trained with " % validationRmse + "rank = %d, lambda = %.4f, and numIter = %d." % (rank, lmbda, numIter)  
  120.         if (validationRmse < bestValidationRmse):  
  121.             bestModel = model  
  122.             bestValidationRmse = validationRmse  
  123.             bestRank = rank  
  124.             bestLambda = lmbda  
  125.             bestNumIter = numIter  
  126.   
  127.   
  128.   
  129.     # evaluate the best model on the test set  
  130.     print "The best model was trained with rank = %d and lambda = %.4f, and numIter = %d ,and Rmse %.4f" % (bestRank, bestLambda,bestNumIter,bestValidationRmse)  
  131.       
  132.     #exit()  
  133.     #通过计算得到rank = 10   lambda = 0.45  numIter = 20 结果最好  
  134.   
  135.     bestModel =  ALS.train(training, 10200.45);  
  136.     # training, validation, test are all RDDs of (userId, movieId, rating)  
  137.   
  138.     #make personalized recommendations  
  139.     #排除该用户已评价过的电影  
  140.      
  141.     testdata = training.filter(lambda x: x[0] not in myRatedMovieIds).map(lambda p: (int(p[0]), int(p[1])))  
  142.   
  143.   
  144.     predictions = bestModel.predictAll(testdata).map(lambda r: ((r[0], r[1]), r[2]))  
  145.     #对预测结果按分值排序 取前5  
  146.     recommendations = predictions.sortBy(lambda x:x[1],False).take(5)  
  147.      
  148.     print "Movies recommended for you:"  
  149.     for i in xrange(len(recommendations)):  
  150.         print ("%2d: %s %s" % (i + 1, recommendations[i][0],recommendations[i][1]))  
  151.   
  152.     # clean up  
  153.     sc.stop() 
数据集采用MovieLens
代码参考https://github.com/databricks/spark-training/blob/master/machine-learning/python/solution/MovieLensALS.py

personalRatingsFile
0::1::?::1400000000::Toy Story (1995)
0::780::?::1400000000::Independence Day (a.k.a. ID4) (1996)
0::590::?::1400000000::Dances with Wolves (1990)
0::1210::?::1400000000::Star Wars: Episode VI - Return of the Jedi (1983)
0::648::?::1400000000::Mission: Impossible (1996)
0::344::?::1400000000::Ace Ventura: Pet Detective (1994)
0::165::?::1400000000::Die Hard: With a Vengeance (1995)
0::153::?::1400000000::Batman Forever (1995)
0::597::?::1400000000::Pretty Woman (1990)
0::1580::?::1400000000::Men in Black (1997)
0::231::?::1400000000::Dumb & Dumber (1994)
阅读(2063) | 评论(1) | 转发(2) |
0

上一篇:Scala ListBuffer使用需显式声明强类型

下一篇:没有了

给主人留下些什么吧!~~

136777462312017-09-09 21:25:42

文明上网,理性发言...

评论热议
请登录后评论。

登录 注册