# create the context
sc = pyspark.SparkContext(conf=conf)
spark = SparkSession.builder.getOrCreate()
训练集和测试集
training = spark.read.option("sep", "\t").csv("MovieLens.training", header=False, schema=schema_ratings)
test = spark.read.option("sep", "\t").csv("MovieLens.test", header=False, schema=schema_ratings)
items = spark.read.option("sep", "|").csv("MovieLens.item", header=False, schema=schema_items)
training
training.printSchema()
|-- user_id: integer (nullable = true)
|-- item_id: integer (nullable = true)
|-- rating: integer (nullable = true)
|-- timestamp: integer (nullable = true)
items
是id
和对应的电影名:
+-------+--------------------+
|item_id| movie|
+-------+--------------------+
| 1| Toy Story (1995)|
| 2| GoldenEye (1995)|
| 3| Four Rooms (1995)|
| 4| Get Shorty (1995)|
| 5| Copycat (1995)|
| 6|Shanghai Triad (Y...|
| 7|Twelve Monkeys (1...|
| 8| Babe (1995)|
| 9|Dead Man Walking ...|
| 10| Richard III (1995)|
| 11|Seven (Se7en) (1995)|
| 12|Usual Suspects, T...|
| 13|Mighty Aphrodite ...|
| 14| Postino, Il (1994)|
| 15|Mr. Holland's Opu...|
| 16|French Twist (Gaz...|
| 17|From Dusk Till Da...|
| 18|White Balloon, Th...|
| 19|Antonia's Line (1...|
| 20|Angels and Insect...|
+-------+--------------------+
使用ALS算法进行训练
from pyspark.ml.recommendation import ALS
als = ALS(rank=10, maxIter=10, regParam=0.1, userCol='user_id', itemCol='item_id', coldStartStrategy='drop')
models = als.fit(training)
使用RegressionEvaluator
进行评估
from pyspark.ml.evaluation import RegressionEvaluator
predictions = models.transform(test)
evaluator = RegressionEvaluator(metricName='rmse', labelCol='rating', predictionCol='prediction')
rmse = evaluator.evaluate(predictions)
top-k推荐
得到top-k的movid_id
和对应的rating
top1 = models.recommendForAllUsers(1)
筛选出movie_id
recommend_item = top1.withColumn('movie_id', top1.recommendations.item_id[0])
得到被推荐最多次的电影id
recommend_most = recommend_item.groupby(
'movie_id'
).agg(
count('*').alias('counts')
).sort(
desc('counts')
通过和items
进行join
得到电影名