> 文档中心 > 离线推荐服务建设——基于Spark平台的协同过滤实时电影推荐系统项目系列博客(七)

离线推荐服务建设——基于Spark平台的协同过滤实时电影推荐系统项目系列博客(七)


系列文章目录

  1. 初识推荐系统——基于Spark平台的协同过滤实时电影推荐系统项目系列博客(一)
  2. 利用用户行为数据——基于Spark平台的协同过滤实时电影推荐系统项目系列博客(二)
  3. 项目主要效果展示——基于Spark平台的协同过滤实时电影推荐系统项目系列博客(三)
  4. 项目体系架构设计——基于Spark平台的协同过滤实时电影推荐系统项目系列博客(四)
  5. 基础环境搭建——基于Spark平台的协同过滤实时电影推荐系统项目系列博客(五)
  6. 创建项目并初始化业务数据——基于Spark平台的协同过滤实时电影推荐系统项目系列博客(六)
  7. 离线推荐服务建设——基于Spark平台的协同过滤实时电影推荐系统项目系列博客(七)
  8. ……

项目资源下载

  1. 电影推荐系统网站项目源码Github地址(可Fork可Clone)
  2. 电影推荐系统网站项目源码Gitee地址(可Fork可Clone)
  3. 电影推荐系统网站项目源码压缩包下载(直接使用)
  4. 电影推荐系统网站项目源码所需全部工具合集打包下载(spark、kafka、flume、tomcat、azkaban、elasticsearch、zookeeper)
  5. 电影推荐系统网站项目源数据(可直接使用)
  6. 电影推荐系统网站项目个人原创论文
  7. 电影推荐系统网站项目前端代码
  8. 电影推荐系统网站项目前端css代码

文章目录

  • 系列文章目录
  • 项目资源下载
  • 前言
  • 一、离线推荐服务
  • 二、离线统计服务
    • 2.1 历史热门电影统计
    • 2.2 最近热门电影统计
    • 2.3 电影平均得分统计
    • 2.4 类别优质电影统计
  • 三、基于隐语义模型的协同过滤推荐
    • 3.1 用户电影推荐矩阵
    • 3.2 电影相似度矩阵
    • 3.3 模型评估和参数选取
  • 总结

前言

  本节博客的内容十分重要,是对之前理论知识的实践运用,所以涉及到代码的编写,还是那句话,读者一定要注意命名、路径等问题,要把我代码中的相关内容替换为您自己的相关内容。当然,在一些晦涩难懂的理论部分我仍会在此篇博文进行讲解。此博客主要是离线推荐服务的建设,其中包括:离线推荐服务、离线统计服务、基于隐语义模型的协同过滤推荐。并且通过离线推荐我们也可以计算出相关数据,为后面的实时推荐打下基础。另外,要明白为什么离线推荐的算法不能用在实时推荐(因为离线推荐算法速度较慢,而实时推荐对时间有要求),还要理解离线推荐算法的精髓。下面就开始今天的学习吧!


一、离线推荐服务

  离线推荐服务是综合用户所有的历史数据,利用设定的离线统计算法和离线推荐算法周期性的进行结果统计与保存,计算的结果在一定时间周期内是固定不变的,变更的频率取决于算法调度的频率
  离线推荐服务主要计算一些可以预先进行统计和计算的指标,为实时计算和前端业务响应提供数据支撑
  离线推荐服务主要分为统计性算法、基于ALS的协同过滤推荐算法以及基于ElasticSearch的内容推荐算法
  在recommender下新建子项目StatisticsRecommender,pom.xml文件中只需引入Spark、Scala和Mongodb的相关依赖:

<dependencies>        <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.11</artifactId>    </dependency>    <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.11</artifactId>    </dependency>        <dependency> <groupId>org.scala-lang</groupId> <artifactId>scala-library</artifactId>    </dependency>            <dependency> <groupId>org.mongodb</groupId> <artifactId>casbah-core_2.11</artifactId> <version>${casbah.version}</version>    </dependency>        <dependency> <groupId>org.mongodb.spark</groupId> <artifactId>mongo-spark-connector_2.11</artifactId> <version>${mongodb-spark.version}</version>    </dependency></dependencies>

  在resources文件夹下引入log4j.properties,然后在src/main/scala下新建scala 单例对象com.IronmanJay.statistics.StatisticsRecommender
  同样,应该先建好样例类,在main()方法中定义配置、创建SparkSession并加载数据,最后关闭spark。代码如下:

package com.IronmanJay.statisticsimport java.text.SimpleDateFormatimport java.util.Dateimport org.apache.spark.SparkConfimport org.apache.spark.sql.{DataFrame, SparkSession}case class Movie(mid: Int, name: String, descri: String, timelong: String, issue: String,   shoot: String, language: String, genres: String, actors: String, directors: String)case class Rating(uid: Int, mid: Int, score: Double, timestamp: Int)case class MongoConfig(uri: String, db: String)// 定义一个基准推荐对象case class Recommendation(mid: Int, score: Double)// 定义电影类别top10推荐对象case class GenresRecommendation(genres: String, recs: Seq[Recommendation])object StatisticsRecommender {  // 定义表名  val MONGODB_MOVIE_COLLECTION = "Movie"  val MONGODB_RATING_COLLECTION = "Rating"  //统计的表的名称  val RATE_MORE_MOVIES = "RateMoreMovies"  val RATE_MORE_RECENTLY_MOVIES = "RateMoreRecentlyMovies"  val AVERAGE_MOVIES = "AverageMovies"  val GENRES_TOP_MOVIES = "GenresTopMovies"  def main(args: Array[String]): Unit = {    val config = Map(      "spark.cores" -> "local[*]",      "mongo.uri" -> "mongodb://linux:27017/recommender",      "mongo.db" -> "recommender"    )    // 创建一个sparkConf    val sparkConf = new SparkConf().setMaster(config("spark.cores")).setAppName("StatisticsRecommeder")    // 创建一个SparkSession    val spark = SparkSession.builder().config(sparkConf).getOrCreate()    import spark.implicits._    implicit val mongoConfig = MongoConfig(config("mongo.uri"), config("mongo.db"))    // 从mongodb加载数据    val ratingDF = spark.read      .option("uri", mongoConfig.uri)      .option("collection", MONGODB_RATING_COLLECTION)      .format("com.mongodb.spark.sql")      .load()      .as[Rating]      .toDF()    val movieDF = spark.read      .option("uri", mongoConfig.uri)      .option("collection", MONGODB_MOVIE_COLLECTION)      .format("com.mongodb.spark.sql")      .load()      .as[Movie]      .toDF()    // 创建名为ratings的临时表    ratingDF.createOrReplaceTempView("ratings")// TODO:不同的统计推荐结果spark.stop()}

二、离线统计服务

2.1 历史热门电影统计

  根据所有历史评分数据,计算历史评分次数最多的电影
  实现思路:通过Spark SQL读取评分数据集,统计所有评分中评分数最多的电影,然后按照从大到小排序,将最终结果写入MongoDB的RateMoreMovies数据集中

// 1. 历史热门统计,历史评分数据最多,mid,countval rateMoreMoviesDF = spark.sql("select mid, count(mid) as count from ratings group by mid")// 把结果写入对应的mongodb表中storeDFInMongoDB(rateMoreMoviesDF, RATE_MORE_MOVIES)def storeDFInMongoDB(df: DataFrame, collection_name: String)(implicit mongoConfig: MongoConfig): Unit = {  df.write    .option("uri", mongoConfig.uri)    .option("collection", collection_name)    .mode("overwrite")    .format("com.mongodb.spark.sql")    .save()}

2.2 最近热门电影统计

  根据评分,按月为单位计算最近时间的月份里面评分数最多的电影集合
  实现思路:通过Spark SQL读取评分数据集,通过UDF函数将评分数据时间修改为月,然后统计每月电影的评分数。统计完成之后将数据写入到MongoDB的RateMoreRecentlyMovies数据集中

// 2. 近期热门统计,按照“yyyyMM”格式选取最近的评分数据,统计评分个数// 创建一个日期格式化工具val simpleDateFormat = new SimpleDateFormat("yyyyMM")// 注册udf,把时间戳转换成年月格式spark.udf.register("changeDate", (x: Int) => simpleDateFormat.format(new Date(x * 1000L)).toInt)// 对原始数据做预处理,去掉uidval ratingOfYearMonth = spark.sql("select mid, score, changeDate(timestamp) as yearmonth from ratings")ratingOfYearMonth.createOrReplaceTempView("ratingOfMonth")// 从ratingOfMonth中查找电影在各个月份的评分,mid,count,yearmonthval rateMoreRecentlyMoviesDF = spark.sql("select mid, count(mid) as count, yearmonth from ratingOfMonth group by yearmonth, mid order by yearmonth desc, count desc")// 存入mongodbstoreDFInMongoDB(rateMoreRecentlyMoviesDF, RATE_MORE_RECENTLY_MOVIES)

2.3 电影平均得分统计

  根据历史数据中所有用户对电影的评分,周期性的计算每个电影的平均得分
  实现思路:通过Spark SQL读取保存在MongDB中的Rating数据集,通过执行以下SQL语句实现对于电影的平均分统计

// 3. 优质电影统计,统计电影的平均评分,mid,avgval averageMoviesDF = spark.sql("select mid, avg(score) as avg from ratings group by mid")// 存入mongodbstoreDFInMongoDB(averageMoviesDF, AVERAGE_MOVIES)

  统计完成之后将生成的新的DataFrame写入到MongoDB的AverageMoviesScore集合中

2.4 类别优质电影统计

  根据提供的所有电影类别, 分别计算每种类型的电影集合中评分最高的 10 个电影
  实现思路:在计算完整个电影的平均得分之后,将影片集合与电影类型做笛卡尔积,然后过滤掉电影类型不符合的条目,将DataFrame 输出到 MongoDB 的 GenresTopMovies
集合中

// 4. 各类别电影Top统计// 定义所有类别val genres = List("Action", "Adventure", "Animation", "Comedy", "Crime", "Documentary", "Drama", "Family", "Fantasy", "Foreign", "History", "Horror", "Music", "Mystery"  , "Romance", "Science", "Tv", "Thriller", "War", "Western")// 把平均评分加入movie表里,加一列,inner joinval movieWithScore = movieDF.join(averageMoviesDF, "mid")// 为做笛卡尔积,把genres转成rddval genresRDD = spark.sparkContext.makeRDD(genres)// 计算类别top10,首先对类别和电影做笛卡尔积val genresTopMoviesDF = genresRDD.cartesian(movieWithScore.rdd)  .filter {    // 条件过滤,找出movie的字段genres值(Action|Adventure|Sci-Fi)包含当前类别genre(Action)的那些    case (genre, movieRow) => movieRow.getAs[String]("genres").toLowerCase.contains(genre.toLowerCase)  }  .map {    case (genre, movieRow) => (genre, (movieRow.getAs[Int]("mid"), movieRow.getAs[Double]("avg")))  }  .groupByKey()  .map {    case (genre, items) => GenresRecommendation(genre, items.toList.sortWith(_._2 > _._2).take(10).map(item => Recommendation(item._1, item._2)))  }  .toDF()// 存入mongodbstoreDFInMongoDB(genresTopMoviesDF, GENRES_TOP_MOVIES)

三、基于隐语义模型的协同过滤推荐

  项目采用ALS作为协同过滤算法,分别根据MongoDB中的用户评分表和电影数据集计算用户电影推荐矩阵以及电影相似度矩阵

3.1 用户电影推荐矩阵

  通过ALS训练出来的Model来计算所有当前用户电影的推荐矩阵,主要思路如下:
  ①:UserId和MovieId做笛卡尔积,产生(uid,mid)的元组
  ②:通过模型预测(uid,mid)的元组
  ③:将预测结果通过预测分值进行排序
  ③:返回分值最大的K个电影,作为当前用户的推荐列表
  最后生成的数据结构如下,并将数据保存到MongoDB的UserRecs表中
在这里插入图片描述
  新建recommender的子项目OfflineRecommender,引入Spark、Scala、Mongodb和Jblas的依赖如下:

<dependencies>    <dependency> <groupId>org.scalanlp</groupId> <artifactId>jblas</artifactId> <version>${jblas.version}</version>    </dependency>        <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.11</artifactId>    </dependency>    <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.11</artifactId>    </dependency>    <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-mllib_2.11</artifactId>    </dependency>        <dependency> <groupId>org.scala-lang</groupId> <artifactId>scala-library</artifactId>    </dependency>            <dependency> <groupId>org.mongodb</groupId> <artifactId>casbah-core_2.11</artifactId> <version>${casbah.version}</version>    </dependency>        <dependency> <groupId>org.mongodb.spark</groupId> <artifactId>mongo-spark-connector_2.11</artifactId> <version>${mongodb-spark.version}</version>    </dependency></dependencies>

  同样经过前期的构建样例类、声明配置、创建SparkSession等步骤,可以加载数据开始计算模型了。在src/main/scala/com.IronmanJay.offline/OfflineRecommender.scala中的核心代码如下:

package com.IronmanJay.offlineimport org.apache.spark.SparkConfimport org.apache.spark.mllib.recommendation.{ALS, Rating}import org.apache.spark.sql.SparkSessionimport org.jblas.DoubleMatrix// 基于评分数据的LFM,只需要rating数据case class MovieRating(uid: Int, mid: Int, score: Double, timestamp: Int)case class MongoConfig(uri: String, db: String)// 定义一个基准推荐对象case class Recommendation(mid: Int, score: Double)// 定义基于预测评分的用户推荐列表case class UserRecs(uid: Int, recs: Seq[Recommendation])// 定义基于LFM电影特征向量的电影相似度列表case class MovieRecs(mid: Int, recs: Seq[Recommendation])object OfflineRecommender {  // 定义表名和常量  val MONGODB_RATING_COLLECTION = "Rating"  val USER_RECS = "UserRecs"  val MOVIE_RECS = "MovieRecs"  val USER_MAX_RECOMMENDATION = 20  def main(args: Array[String]): Unit = {    val config = Map(      "spark.cores" -> "local[*]",      "mongo.uri" -> "mongodb://linux:27017/recommender",      "mongo.db" -> "recommender"    )    val sparkConf = new SparkConf().setMaster(config("spark.cores")).setAppName("OfflineRecommender")    // 创建一个SparkSession    val spark = SparkSession.builder().config(sparkConf).getOrCreate()    import spark.implicits._    implicit val mongoConfig = MongoConfig(config("mongo.uri"), config("mongo.db"))    // 加载数据    val ratingRDD = spark.read      .option("uri", mongoConfig.uri)      .option("collection", MONGODB_RATING_COLLECTION)      .format("com.mongodb.spark.sql")      .load()      .as[MovieRating]      .rdd      .map(rating => (rating.uid, rating.mid, rating.score)) // 转化成rdd,并且去掉时间戳      .cache()    // 从rating数据中提取所有的uid和mid,并去重    val userRDD = ratingRDD.map(_._1).distinct()    val movieRDD = ratingRDD.map(_._2).distinct()    // 训练隐语义模型    val trainData = ratingRDD.map(x => Rating(x._1, x._2, x._3))    val (rank, iterations, lambda) = (200, 5, 0.1)    val model = ALS.train(trainData, rank, iterations, lambda)    // 基于用户和电影的隐特征,计算预测评分,得到用户的推荐列表    // 计算user和movie的笛卡尔积,得到一个空评分矩阵    val userMovies = userRDD.cartesian(movieRDD)    // 调用model的predict方法预测评分    val preRatings = model.predict(userMovies)    val userRecs = preRatings      .filter(_.rating > 0) // 过滤出评分大于0的项      .map(rating => (rating.user, (rating.MOVIE, rating.rating)))      .groupByKey()      .map { case (uid, recs) => UserRecs(uid, recs.toList.sortWith(_._2 > _._2).take(USER_MAX_RECOMMENDATION).map(x => Recommendation(x._1, x._2)))      }      .toDF()    userRecs.write      .option("uri", mongoConfig.uri)      .option("collection", USER_RECS)      .mode("overwrite")      .format("com.mongodb.spark.sql")      .save()    // TODO:计算电影相似度矩阵spark.stop()}

3.2 电影相似度矩阵

  通过ALS计算电影相似度矩阵,该矩阵用于查询当前电影的相似电影并为实时推荐系统服务
  离线计算的ALS算法最终会为用户、电影分别生成最终的特征矩阵,分别是表示用户特征矩阵的 U ( m × k ) U(m×k) U(m×k)矩阵,每个用户由 k k k个特征描述;表示物品特征矩阵的 V ( n × k ) V(n×k) V(n×k)矩阵,每个物品也由 k k k个特征描述
   V ( n × k ) V(n×k) V(n×k)表示物品特征矩阵,每一行是一个 k k k维向量,虽然并不知道每一个维度的特征意义是什么,但是 k k k个维度的数学向量表示了该行对应电影的特征
  所以,每个电影用 V ( n × k ) V(n×k) V(n×k)每一行的 <t1,t2,t3,tk>向量表示其特征,于是任意两个电影p:特征向量为 <Vp=tp1,tp2,tp3,tpk>,电影q:特征向量为 <Vq=tq1,tq2,tq3,tqk>,并且之间的相似度 S i m ( p , q ) Sim(p,q) Sim(p,q)可以使用 V p V_p Vp V q V_q Vq的余弦值来表示:
S i m ( p , q ) = ∑ i = 0 k( t p i × t q i ) ∑ i = 0 k t p i 2 ×∑ i = 0 k t q i 2 Sim(p,q)=\frac{\sum_{i=0}^{k}(t_{pi}\times t_{qi})}{\sqrt{\sum_{i=0}^{k}t_{pi}^2}\times \sqrt{\sum_{i=0}^{k}t_{qi}^2}} Sim(p,q)=i=0ktpi2 ×i=0ktqi2 i=0k(tpi×tqi)
  数据集中任意两个电影间的相似度都可以由公式计算得到,电影与电影之间的相似度在一段时间内基本是固定值。最后生成的数据保存到MongoDB的MOVIERecs表中
离线推荐服务建设——基于Spark平台的协同过滤实时电影推荐系统项目系列博客(七)
  核心代码如下:

// 基于电影隐特征,计算相似度矩阵,得到电影的相似度列表val movieFeatures = model.MOVIEFeatures.map {  case (mid, features) => (mid, new DoubleMatrix(features))}// 对所有电影两两计算它们的相似度,先做笛卡尔积val movieRecs = movieFeatures.cartesian(movieFeatures)  .filter {    // 把自己跟自己的配对过滤掉    case (a, b) => a._1 != b._1  }  .map {    case (a, b) => {      val simScore = this.consinSim(a._2, b._2)      (a._1, (b._1, simScore))    }  }  .filter(_._2._2 > 0.6) // 过滤出相似度大于0.6的  .groupByKey()  .map {    case (mid, items) => MovieRecs(mid, items.toList.sortWith(_._2 > _._2).map(x => Recommendation(x._1, x._2)))  }  .toDF()movieRecs.write  .option("uri", mongoConfig.uri)  .option("collection", MOVIE_RECS)  .mode("overwrite")  .format("com.mongodb.spark.sql")  .save()

  其中,consinSim是求两个向量余弦相似度的函数,代码实现如下:

// 求向量余弦相似度def consinSim(movie1: DoubleMatrix, movie2: DoubleMatrix): Double = {  movie1.dot(movie2) / (movie1.norm2() * movie2.norm2())}

3.3 模型评估和参数选取

  在上述模型训练的过程中,直接给定了隐语义模型的rank、iterations、lambda三个参数。对于模型来说这并不一定是最优的参数选取,所以需要对模型进行评估。通常的做法是计算均方根误差( R M S E RMSE RMSE),考察预测评分与实际评分之间的误差
R M S E = 1 N ∑ t = 1 N( o b s e r v e d t− p r e d i c t e d t ) 2 RMSE=\sqrt{\frac{1}{N}\sum_{t=1}^{N}(observed_t-predicted_t)^2} RMSE=N1t=1N(observedtpredictedt)2
  有了 R M S E RMSE RMSE,就可以通过多次调整参数值,来选取 R M S E RMSE RMSE最小的一组作为模型的优化选择
  在scala/com.IronmanJay.offline/下新建单例对象ALSTrainer,代码主体架构如下:

package com.IronmanJay.offlineimport breeze.numerics.sqrtimport com.IronmanJay.offline.OfflineRecommender.MONGODB_RATING_COLLECTIONimport org.apache.spark.SparkConfimport org.apache.spark.mllib.recommendation.{ALS, MatrixFactorizationModel, Rating}import org.apache.spark.rdd.RDDimport org.apache.spark.sql.SparkSessionobject ALSTrainer {  def main(args: Array[String]): Unit = {    val config = Map(      "spark.cores" -> "local[*]",      "mongo.uri" -> "mongodb://linux:27017/recommender",      "mongo.db" -> "recommender"    )    val sparkConf = new SparkConf().setMaster(config("spark.cores")).setAppName("OfflineRecommender")    // 创建一个SparkSession    val spark = SparkSession.builder().config(sparkConf).getOrCreate()    implicit val mongoConfig = MongoConfig(config("mongo.uri"), config("mongo.db"))    import spark.implicits._    // 加载评分数据    val ratingRDD = spark.read      .option("uri", mongoConfig.uri)      .option("collection", MONGODB_RATING_COLLECTION)      .format("com.mongodb.spark.sql")      .load()      .as[MovieRating]      .rdd      .map(rating => Rating(rating.uid, rating.mid, rating.score)) // 转化成rdd,并且去掉时间戳      .cache()    // 随机切分数据集,生成训练集和测试集    val splits = ratingRDD.randomSplit(Array(0.8, 0.2))    val trainingRDD = splits(0)    val testRDD = splits(1)    // 模型参数选择,输出最优参数    adjustALSParam(trainingRDD, testRDD)    spark.close()  }}

  其中adjustALSParams方法是模型评估的核心,输入一组训练数据和测试数据,输出计算得到最小RMSE的那组参数。代码实现如下:

// 输出最优参数def adjustALSParams(trainData: RDD[Rating], testData: RDD[Rating]): Unit = {  val result = for (rank <- Array(50, 100, 200, 300); lambda <- Array(0.01, 0.1, 1))    yield {      val model = ALS.train(trainData, rank, 5, lambda)      // 计算当前参数对应模型的rmse,返回Double      val rmse = getRMSE(model, testData)      (rank, lambda, rmse)    }  // 控制台打印输出最优参数  println(result.minBy(_._3))}

  计算 R M S E RMSE RMSE的函数getRMSE代码实现如下:

def getRMSE(model: MatrixFactorizationModel, data: RDD[Rating]): Double = {  // 计算预测评分  val userMOVIEs = data.map(item => (item.user, item.MOVIE))  val predictRating = model.predict(userMOVIEs)  // 以uid,mid作为外键,inner join实际观测值和预测值  val observed = data.map(item => ((item.user, item.MOVIE), item.rating))  val predict = predictRating.map(item => ((item.user, item.MOVIE), item.rating))  // 内连接得到(uid, mid),(actual, predict)  sqrt(    observed.join(predict).map {      case ((uid, mid), (actual, pre)) => val err = actual - pre err * err    }.mean()  )}

  运行代码,就可以得到目前数据的最优模型参数


总结

  此篇博客也宣告结束啦,这篇博文的代码相对于来说有点多,所以需要读者仔细阅读,要记住这篇博文写的代码不仅仅是作为离线推荐服务使用,在后面的实时推荐服务部分也有需求,所以这篇博客很重要。下篇博文将为大家带来实时推荐服务,那就下篇博客见啦!