Python代码:
-
#!/usr/bin/env python
-
# coding=utf-8
-
'''
-
运行命令/yourpath/spark/bin/spark-submit --driver-memory 1g MovieLensALS.py movieLensDataDir personalRatingsFile
-
movieLensDataDir 电影评分数据集目录 比如 ml-1m/
-
personalRatingsFile 需要推荐的某用户的评价数据 格式参考ratings.dat
-
-
'''
-
import sys
-
import itertools
-
from math import sqrt
-
from operator import add
-
from os.path import join, isfile, dirname
-
-
from pyspark import SparkConf, SparkContext
-
from pyspark.mllib.recommendation import ALS
-
-
def parseRating(line):
-
"""
-
Parses a rating record in MovieLens format userId::movieId::rating::timestamp .
-
"""
-
fields = line.strip().split("::")
-
return long(fields[3]) % 10, (int(fields[0]), int(fields[1]), float(fields[2]))
-
-
def parseMovie(line):
-
"""
-
Parses a movie record in MovieLens format movieId::movieTitle .
-
"""
-
fields = line.strip().split("::")
-
return int(fields[0]), fields[1]
-
-
def loadRatings(ratingsFile):
-
"""
-
Load ratings from file.
-
"""
-
if not isfile(ratingsFile):
-
print "File %s does not exist." % ratingsFile
-
sys.exit(1)
-
f = open(ratingsFile, 'r')
-
ratings = filter(lambda r: r[2] > 0, [parseRating(line)[1] for line in f])
-
f.close()
-
if not ratings:
-
print "No ratings provided."
-
sys.exit(1)
-
else:
-
return ratings
-
-
def computeRmse(model, data, n):
-
"""
-
Compute RMSE (Root Mean Squared Error).
-
"""
-
predictions = model.predictAll(data.map(lambda x: (x[0], x[1])))
-
predictionsAndRatings = predictions.map(lambda x: ((x[0], x[1]), x[2])).join(data.map(lambda x: ((x[0], x[1]), x[2]))).values()
-
return sqrt(predictionsAndRatings.map(lambda x: (x[0] - x[1]) ** 2).reduce(add) / float(n))
-
-
if __name__ == "__main__":
-
if (len(sys.argv) != 3):
-
print "Usage: /path/to/spark/bin/spark-submit --driver-memory 1g MovieLensALS.py movieLensDataDir personalRatingsFile"
-
sys.exit(1)
-
-
# set up environment
-
conf = SparkConf().setAppName("MovieLensALS").set("spark.executor.memory", "1g")
-
sc = SparkContext(conf=conf)
-
-
# load personal ratings
-
myRatings = loadRatings(sys.argv[2])
-
myRatingsRDD = sc.parallelize(myRatings, 1)
-
-
movieLensHomeDir = sys.argv[1]
-
-
# ratings is an RDD of (last digit of timestamp, (userId, movieId, rating))
-
ratings = sc.textFile(join(movieLensHomeDir, "ratings.dat")).map(parseRating)
-
-
# movies is an RDD of (movieId, movieTitle)
-
movies = dict(sc.textFile(join(movieLensHomeDir, "movies.dat")).map(parseMovie).collect())
-
-
numRatings = ratings.count()
-
numUsers = ratings.values().map(lambda r: r[0]).distinct().count()
-
numMovies = ratings.values().map(lambda r: r[1]).distinct().count()
-
-
myRatedMovieIds = set([x[1] for x in myRatings])
-
-
print "Got %d ratings from %d users on %d movies." % (numRatings, numUsers, numMovies)
-
-
-
# split ratings into train , validation
-
# last digit of the timestamp, add myRatings to train, and cache them
-
-
# training, validation, test are all RDDs of (userId, movieId, rating)
-
-
numPartitions = 4
-
#training = ratings.filter(lambda x: x[0] < 8).values().union(myRatingsRDD).repartition(numPartitions).cache()
-
-
validation = ratings.filter(lambda x: x[0] >= 8 and x[0] < 10).values().repartition(numPartitions).cache()
-
-
-
-
-
-
numTraining = training.count()
-
numValidation = validation.count()
-
-
print "Training: %d, validation: %d" % (numTraining, numValidation)
-
-
# train models and evaluate them on the validation set
-
-
ranks = [10,12]
-
lambdas = [0.01,0.4,1.0]
-
numIters = [10]
-
bestModel = None
-
bestValidationRmse = float("inf")
-
bestRank = 0
-
bestLambda = -1.0
-
bestNumIter = -1
-
-
for rank, lmbda, numIter in itertools.product(ranks, lambdas, numIters):
-
model = ALS.train(training, rank, numIter, lmbda)
-
validationRmse = computeRmse(model, validation, numValidation)
-
print "RMSE (validation) = %f for the model trained with " % validationRmse + "rank = %d, lambda = %.4f, and numIter = %d." % (rank, lmbda, numIter)
-
if (validationRmse < bestValidationRmse):
-
bestModel = model
-
bestValidationRmse = validationRmse
-
bestRank = rank
-
bestLambda = lmbda
-
bestNumIter = numIter
-
-
-
-
# evaluate the best model on the test set
-
print "The best model was trained with rank = %d and lambda = %.4f, and numIter = %d ,and Rmse %.4f" % (bestRank, bestLambda,bestNumIter,bestValidationRmse)
-
-
#exit()
-
#通过计算得到rank = 10 lambda = 0.45 numIter = 20 结果最好
-
-
bestModel = ALS.train(training, 10, 20, 0.45);
-
# training, validation, test are all RDDs of (userId, movieId, rating)
-
-
#make personalized recommendations
-
#排除该用户已评价过的电影
-
-
testdata = training.filter(lambda x: x[0] not in myRatedMovieIds).map(lambda p: (int(p[0]), int(p[1])))
-
-
-
predictions = bestModel.predictAll(testdata).map(lambda r: ((r[0], r[1]), r[2]))
-
#对预测结果按分值排序 取前5
-
recommendations = predictions.sortBy(lambda x:x[1],False).take(5)
-
-
print "Movies recommended for you:"
-
for i in xrange(len(recommendations)):
-
print ("%2d: %s %s" % (i + 1, recommendations[i][0],recommendations[i][1]))
-
-
# clean up
-
sc.stop()
数据集采用MovieLens
代码参考
personalRatingsFile
0::1::?::1400000000::Toy Story (1995)
0::780::?::1400000000::Independence Day (a.k.a. ID4) (1996)
0::590::?::1400000000::Dances with Wolves (1990)
0::1210::?::1400000000::Star Wars: Episode VI - Return of the Jedi (1983)
0::648::?::1400000000::Mission: Impossible (1996)
0::344::?::1400000000::Ace Ventura: Pet Detective (1994)
0::165::?::1400000000::Die Hard: With a Vengeance (1995)
0::153::?::1400000000::Batman Forever (1995)
0::597::?::1400000000::Pretty Woman (1990)
0::1580::?::1400000000::Men in Black (1997)
0::231::?::1400000000::Dumb & Dumber (1994)
阅读(9789) | 评论(1) | 转发(2) |