实时推荐服务建设——基于Spark平台的协同过滤实时电影推荐系统项目系列博客(八)
系列文章目录
- 初识推荐系统——基于Spark平台的协同过滤实时电影推荐系统项目系列博客(一)
- 利用用户行为数据——基于Spark平台的协同过滤实时电影推荐系统项目系列博客(二)
- 项目主要效果展示——基于Spark平台的协同过滤实时电影推荐系统项目系列博客(三)
- 项目体系架构设计——基于Spark平台的协同过滤实时电影推荐系统项目系列博客(四)
- 基础环境搭建——基于Spark平台的协同过滤实时电影推荐系统项目系列博客(五)
- 创建项目并初始化业务数据——基于Spark平台的协同过滤实时电影推荐系统项目系列博客(六)
- 离线推荐服务建设——基于Spark平台的协同过滤实时电影推荐系统项目系列博客(七)
- 实时推荐服务建设——基于Spark平台的协同过滤实时电影推荐系统项目系列博客(八)
- ……
项目资源下载
- 电影推荐系统网站项目源码Github地址(可Fork可Clone)
- 电影推荐系统网站项目源码Gitee地址(可Fork可Clone)
- 电影推荐系统网站项目源码压缩包下载(直接使用)
- 电影推荐系统网站项目源码所需全部工具合集打包下载(spark、kafka、flume、tomcat、azkaban、elasticsearch、zookeeper)
- 电影推荐系统网站项目源数据(可直接使用)
- 电影推荐系统网站项目个人原创论文
- 电影推荐系统网站项目前端代码
- 电影推荐系统网站项目前端css代码
文章目录
- 系列文章目录
- 项目资源下载
- 前言
- 一、实时推荐服务需求
- 二、实时推荐算法设计
- 三、实时推荐算法实现
-
- 3.1 获取用户的K次最近评分
- 3.2 获取当前电影最相似的K个电影
- 3.3 电影推荐优先级计算
- 3.4 将结果保存到 M o n g o D BMongoDB MongoDB
- 3.5 更新实时推荐结果
- 四、实时系统联调
-
- 4.1 启动实时系统的基本组件
- 4.2 启动 Z o o k e e p e rZookeeper Zookeeper
- 4.3 启动 K a f k aKafka Kafka
- 4.4 启动 K a f k a S t r e a m i n gKafka Streaming KafkaStreaming程序
- 4.5 配置并启动 F l u m eFlume Flume
- 4.6 启动业务系统后台
- 总结
前言
这篇博文算是整个系列的一个重点,是利用离线推荐服务的得到的数据进行实时推荐,并利用 K a f k a Kafka Kafka、 F l u m e Flume Flume等进行实时流传输,其中不仅包括代码还包括一些配置,需要读者对Linux系统比较熟悉,因为大部分都是在Linux系统上进行操作。本篇博文分四部分,分别是:实时推荐服务需求、实时推荐算法设计、实时推荐算法实现、实时系统联调。下面就开始今天的学习吧!
一、实时推荐服务需求
实时计算与离线计算应用于推荐系统上最大的不同在于实时计算推荐结果应该是反应最近一段时间用户近期的偏好,而离线计算推荐结果则是根据用户从第一次评分起的所有评分记录来计算用户总体的偏好
用户对物品的偏好随着时间的推移总是会改变的。比如一个用户 u u u在某时刻对电影 p p p给予了极高的评分,那么在近期一段时间内, u u u极有可能很喜欢与电影 p p p类似的其他电影;而如果用户 u u u在某时刻对电影 q q q给予了极低的评分,那么在近期一段时间, u u u极有可能不喜欢与电影 q q q类似的其他电影。所以对于实时推荐,当用户对一个电影进行了评价之后,用户会希望推荐结果基于最近这几次评分进行一定的更新,使得推荐结果匹配用户近期的偏好,满足用户近期的口味
如果实时推荐继续采用离线推荐中的 A L S ALS ALS算法,由于算法运行时间巨大,不具有实时更新得到新的推荐结果的能力。并且由于算法本身使用的是评分表,用户本次评分之后只更新了总评分表中的一项,使得算法运行后的推荐结果与用户本次评分之前的推荐结果基本没有多少变化,从而给用户一种推荐结果一直没变化的感觉,很影响用户体验
另外,在实时推荐中由于时间性能上要满足实时或者准实时的要求,所以算法的计算量不能太大,避免复杂、过多的计算造成用户体验的下降。鉴于此,推荐精度往往不会很高。实时推荐系统更关心推荐结果的动态变化能力,只要更新推荐结果的理由合理即可,至于推荐精度的要求可以适当放宽
所以对于实时推荐算法,主要有两点需求:
①:用户本次评分后、或最近几个评分后系统可以明显的更新推荐结果
②:计算量不大,满足响应时间上的实时或者准实时要求
二、实时推荐算法设计
当用户 u u u对电影 p p p进行了评分,将触发一次对 u u u的推荐结果的更新。由于用户 u u u对电影 p p p评分,对于用户 u u u来说,他与 p p p最相似的电影们之间的推荐强度将发生变化,所以选取与电影 p p p最相似的 K K K个电影作为侯选电影
每个侯选电影按照“推荐优先级”这一权重作为衡量这个电影被推荐给用户 u u u的优先级
这些电影将根据用户 u u u最近的若干评分计算出各自对用户 u u u的推荐优先级,然后与上次对用户 u u u的实时推荐结果进行基于推荐优先级的合并、替换得到更新后的推荐结果
具体来说:首先获取用户 u u u按时间顺序最近的 K K K个评分,记为 R K RK RK;获取电影 p p p的最相似的 K K K个电影集合,记为 S S S;然后对于每个电影 q ∈ S q \in S q∈S,计算推荐优先级 Euq E_{uq} Euq,计算公式如下:
E u q = ∑ r ∈ R K s i m ( q , r ) ∗ R r s i m _ s u m + l g m a x { i n c o u n t , 1 } − l g m a x { r e c o u n t , 1 } E_{uq}=\frac{\sum_{r\in{RK}}sim(q,r)*R_{r}}{sim\_sum}+lgmax\{incount,1\}-lgmax\{recount,1\} Euq=sim_sum∑r∈RKsim(q,r)∗Rr+lgmax{incount,1}−lgmax{recount,1}
其中的参数意义如下:
- Rr R_r Rr表示用户u u u对电影r r r的评分
- s i m ( q , r ) sim(q,r) sim(q,r)表示电影q q q与电影r r r的相似度,设定最小相似度为0.6,当电影q q q和电影r r r相似度低于0.6的阈值,则视为两者不相关并忽略
- s i m _ s u m sim\_sum sim_sum表示q q q与R K RK RK中电影相似度大于最高阈值的个数
- i n c o u n t incount incount表示R K RK RK中与电影q q q相似的、且本身评分较高(≥3)的电影个数
- r e c o u n t recount recount表示R K RK RK中与电影q q q相似的、且本身评分较低(<3)的电影个数
此公式的意义为:首先对于每个候选电影 q q q,从 u u u最近的 K K K个评分中,找出与 q q q相似度较高( ≥ 0.6 ≥0.6 ≥0.6)的 u u u已评分电影,对于这些电影中的每个电影 r r r,将 r r r与 q q q的相似度乘以用户 u u u对 r r r的评分,将这些乘积计算平均数,作为用户 u u u对电影 q q q的评分预测即:
∑ r ∈ R K s i m ( q , r ) ∗ R r s i m _ s u m\frac{\sum_{r\in{RK}}sim(q,r)*R_{r}}{sim\_sum} sim_sum∑r∈RKsim(q,r)∗Rr
然后,将 u u u最近的 K K K个评分中与电影 q q q相似的、且本身评分较高(≥3)的电影个数记为 i n c o u n t incount incount,计算 l g m a x { i n c o u n t , 1 } lgmax \{ incount,1 \} lgmax{incount,1}作为电影 q q q的“增强因子”,意义在于电影 q q q与 u u u的最近 K K K个评分中的 n n n个高评分(≥3)电影相似,则电影 q q q的优先级被增加 l g m a x { i n c o u n t , 1 } lgmax \{ incount,1 \} lgmax{incount,1}。如果电影 q q q与 u u u的最近 K K K个评分中相似的高评分电影越多,也就是说 n n n越大,则电影 q q q更应该被推荐,所以推荐优先级被增强的幅度较大;如果电影 q q q与 u u u的最近 K K K个评分中相似的高评分电影越少,也就是 n n n越小,则推荐优先级被增强的幅度较小
而后,将 u u u最近的 K K K个评分中与电影 q q q相似的、且本身评分较低(<3)的电影个数记为 r e c o u n t recount recount,计算 l g m a x { r e c o u n t , 1 } lgmax \{ recount,1 \} lgmax{recount,1}作为电影 q q q的“削弱因子”,意义在于电影 q q q与 u u u的最近 K K K个评分中的 n n n个低评分(<3)电影相似,则电影 q q q的优先级被削减 l g m a x { r e c o u n t , 1 } lgmax \{ recount,1 \} lgmax{recount,1}。如果电影 q q q与 u u u的最近 K K K个评分中相似的低评分电影越多,也就是说 n n n越大,则电影 q q q更不应该被推荐,所以推荐优先级被减弱的幅度较大;如果电影 q q q与 u u u的最近 K K K个评分中相似的低评分电影越少,也就是 n n n越小,则推荐优先级被减弱的幅度较小
最后,将增强因子增加到上述的预测评分中,并减去削弱因子,得到最终的 q q q电影对于 u u u的推荐优先级。在计算完每个侯选电影 q q q的 Euq E_{uq} Euq后,将生成一组<电影 q q q的ID, q q q的推荐优先级>的列表 u p d a t e d L i s t updatedList updatedList:
u p d a t e d L i s t =⋃ q ∈ S{qID, E u q} updatedList = \bigcup_{q \in S}^{}\left \{ qID,E_{uq} \right \} updatedList=q∈S⋃{qID,Euq}
而在本次为用户 u u u实时推荐之前的上一次实时推荐结果 R e c Rec Rec也是一组<电影 m m m的ID, m m m的推荐优先级>的列表,其大小也为 K K K:
R e c =⋃ m ∈ R e c{mID, E u m} , l e n ( R e c ) = K Rec = \bigcup_{m \in Rec}^{}\left \{ mID,E_{um} \right \},len(Rec)=K Rec=m∈Rec⋃{mID,Eum},len(Rec)=K
接下来,将 u p d a t e d _ S updated\_S updated_S与本次为 u u u实时推荐之前的上一次实时推荐结果 R e c Rec Rec进行合并、替换形成新的推荐结果 N e w R e c NewRec NewRec:
N e w R e c = t o p K ( i ∈ R e c ∪ u p d a t e d L i s t , c m p =E u i ) NewRec = topK(i \in Rec \cup updatedList,cmp=E_{ui}) NewRec=topK(i∈Rec∪updatedList,cmp=Eui)
其中, i i i表示 u p d a t e d _ S updated\_S updated_S与 R e c Rec Rec的电影集合中的每个电影, t o p K topK topK是一个函数,表示从 R e c ∪ u p d a t e d _ S Rec \cup updated\_S Rec∪updated_S中选择出最大的 K K K个电影, c m p = Eui cmp = E_{ui} cmp=Eui表示 t o p K topK topK函数将推荐优先级 Eui E_{ui} Eui值最大的 K K K个电影选出来。最终, N e w R e c NewRec NewRec即为经过用户 u u u对电影 p p p评分后触发的实时推荐得到的最新推荐结果
总之,实时推荐算法流程基本如下:
- 用户u u u对电影p p p进行了评分,触发了实时推荐的一次计算
- 选出电影p p p最相似的K K K个电影作为集合S S S
- 获取用户u u u最近时间内的K K K条评分,包含本次评分,作为集合R K RK RK
- 计算电影的推荐优先级,产生集合u p d a t e d _ S updated\_S updated_S
将 u p d a t e d _ S updated\_S updated_S与上次对用户 u u u的推荐结果利用上述公式进行合并,产生新的推荐结果 N e w R e c NewRec NewRec,作为最终输出
在recommender下新建子项目StreamingRecommender,引入Spark、Scalc、MongoDB、Redis和Kafka的依赖:
<dependencies> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.11</artifactId> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.11</artifactId> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming_2.11</artifactId> </dependency> <dependency> <groupId>org.scala-lang</groupId> <artifactId>scala-library</artifactId> </dependency> <dependency> <groupId>org.mongodb</groupId> <artifactId>casbah-core_2.11</artifactId> <version>${casbah.version}</version> </dependency> <dependency> <groupId>org.mongodb.spark</groupId> <artifactId>mongo-spark-connector_2.11</artifactId> <version>${mongodb-spark.version}</version> </dependency> <dependency> <groupId>redis.clients</groupId> <artifactId>jedis</artifactId> <version>2.9.0</version> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>0.10.2.1</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming-kafka-0-10_2.11</artifactId> <version>${spark.version}</version> </dependency></dependencies>
代码中首先定义样例类和一个连接助手对象(用于建立Redis和MongoDB连接),并在src/main/scala/com.IronmanJay.streaming/StreamingRecommender.scala中定义一些常量:
package com.IronmanJay.streamingimport com.mongodb.casbah.commons.MongoDBObjectimport com.mongodb.casbah.{MongoClient, MongoClientURI}import org.apache.kafka.common.serialization.StringDeserializerimport org.apache.spark.SparkConfimport org.apache.spark.sql.SparkSessionimport org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}import org.apache.spark.streaming.{Seconds, StreamingContext}import redis.clients.jedis.Jedis// 定义连接助手对象,序列化object ConnHelper extends Serializable { lazy val jedis = new Jedis("linux") lazy val mongoClient = MongoClient(MongoClientURI("mongodb://linux:27017/recommender"))}case class MongoConfig(uri: String, db: String)// 定义一个基准推荐对象case class Recommendation(mid: Int, score: Double)// 定义基于预测评分的用户推荐列表case class UserRecs(uid: Int, recs: Seq[Recommendation])// 定义基于LFM电影特征向量的电影相似度列表case class MovieRecs(mid: Int, recs: Seq[Recommendation])object StreamingRecommender { val MAX_USER_RATINGS_NUM = 20 val MAX_SIM_MOVIES_NUM = 20 val MONGODB_STREAM_RECS_COLLECTION = "StreamRecs" val MONGODB_RATING_COLLECTION = "Rating" val MONGODB_MOVIE_RECS_COLLECTION = "MovieRecs"def main(args: Array[String]): Unit = { }}
实时推荐主体代码如下:
def main(args: Array[String]): Unit = { val config = Map( "spark.cores" -> "local[*]", "mongo.uri" -> "mongodb://linux:27017/recommender", "mongo.db" -> "recommender", "kafka.topic" -> "recommender" ) val sparkConf = new SparkConf().setMaster(config("spark.cores")).setAppName("StreamingRecommender") // 创建一个SparkSession val spark = SparkSession.builder().config(sparkConf).getOrCreate() // 拿到streaming context val sc = spark.sparkContext val ssc = new StreamingContext(sc, Seconds(2)) // batch duration import spark.implicits._ implicit val mongoConfig = MongoConfig(config("mongo.uri"), config("mongo.db")) // 加载电影相似度矩阵数据,把它广播出去 val simMovieMatrix = spark.read .option("uri", mongoConfig.uri) .option("collection", MONGODB_MOVIE_RECS_COLLECTION) .format("com.mongodb.spark.sql") .load() .as[MovieRecs] .rdd .map { movieRecs => // 为了查询相似度方便,转换成map (movieRecs.mid, movieRecs.recs.map(x => (x.mid, x.score)).toMap) }.collectAsMap() val simMovieMatrixBroadCast = sc.broadcast(simMovieMatrix) // 定义kafka连接参数 val kafkaParam = Map( "bootstrap.servers" -> "linux:9092", "key.deserializer" -> classOf[StringDeserializer], "value.deserializer" -> classOf[StringDeserializer], "group.id" -> "recommender", "auto.offset.reset" -> "latest" ) // 通过kafka创建一个DStream val kafkaStream = KafkaUtils.createDirectStream[String, String](ssc, LocationStrategies.PreferConsistent, ConsumerStrategies.Subscribe[String, String](Array(config("kafka.topic")), kafkaParam) ) // 把原始数据UID|MID|SCORE|TIMESTAMP 转换成评分流 val ratingStream = kafkaStream.map { msg => val attr = msg.value().split("\\|") (attr(0).toInt, attr(1).toInt, attr(2).toDouble, attr(3).toInt) } // 继续做流式处理,核心实时算法部分 ratingStream.foreachRDD { rdds => rdds.foreach { case (uid, mid, score, timestamp) => { println("rating data coming! >>>>>>>>>>>>>>>>") // 1. 从redis里获取当前用户最近的K次评分,保存成Array[(mid, score)] val userRecentlyRatings = getUserRecentlyRating(MAX_USER_RATINGS_NUM, uid, ConnHelper.jedis) // 2. 从相似度矩阵中取出当前电影最相似的N个电影,作为备选列表,Array[mid] val candidateMovies = getTopSimMovies(MAX_SIM_MOVIES_NUM, mid, uid, simMovieMatrixBroadCast.value) // 3. 对每个备选电影,计算推荐优先级,得到当前用户的实时推荐列表,Array[(mid, score)] val streamRecs = computeMovieScores(candidateMovies, userRecentlyRatings, simMovieMatrixBroadCast.value) // 4. 把推荐数据保存到mongodb saveDataToMongoDB(uid, streamRecs) } } } // 开始接收和处理数据 ssc.start() println(">>>>>>>>>>>>>>> streaming started!") ssc.awaitTermination()}
三、实时推荐算法实现
实时推荐算法的前提:
- 在Redis集群中存储了每一个用户最近对电影的K K K次评分,实时算法可以快速获取
- 离线推荐算法已经将电影相似度矩阵提前计算到了MongoDB中
- Kafka已经获取到了用户实时的评分数据
算法过程如下:
实时推荐算法输入为一个评分,而执行的核心内容包括:获取 u s e r I d userId userId最近 K K K次评分、获取 m i d mid mid最相似的 K K K个电影、计算候选电影的推荐优先级、更新对 u s e r I d userId userId的实时推荐结果
3.1 获取用户的K次最近评分
业务服务器在接收用户评分的时候,默认会将该评分情况以 u s e r I d userId userId、 m i d mid mid、 r a t e rate rate、 t i m e s t a m p timestamp timestamp的格式插入到Redis中的该用户对应的队列当中,在实时算法中,只需要通过Redis客户端获取相对应的队列内容即可
// redis操作返回的是java类,为了用map操作需要引入转换类import scala.collection.JavaConversions._def getUserRecentlyRating(num: Int, uid: Int, jedis: Jedis): Array[(Int, Double)] = { // 从redis读取数据,用户评分数据保存在 uid:UID 为key的队列里,value是 MID:SCORE jedis.lrange("uid:" + uid, 0, num - 1) .map { item => // 具体每个评分又是以冒号分隔的两个值 val attr = item.split("\\:") (attr(0).trim.toInt, attr(1).trim.toDouble) } .toArray}
3.2 获取当前电影最相似的K个电影
在离线算法中,已经预先将电影的相似度矩阵进行了计算,所以每个电影( m i d mid mid)的最相似的 K K K个电影很容易获取:从MongoDB中读取MOVIERecs数据,从 m i d mid mid在 s i m H a s h simHash simHash对应的子哈希表中获取相似度前 K K K大的那些电影。输出是数据类型为Array[Int]的数组,表示与 m i d mid mid最相似的电影集合,并命名为candidateMovies以作为侯选电影集合
/ * 获取跟当前电影做相似的num个电影,作为备选电影 * * @param num相似电影的数量 * @param mid当前电影ID * @param uid当前评分用户ID * @param simMovies 相似度矩阵 * @return 过滤之后的备选电影列表 */def getTopSimMovies(num: Int, mid: Int, uid: Int,simMovies:scala.collection.Map[Int,scala.collection.immutable.Map[Int, Double]])(implicit mongoConfig: MongoConfig): Array[Int] = { // 1. 从相似度矩阵中拿到所有相似的电影 val allSimMovies = simMovies(mid).toArray // 2. 从mongodb中查询用户已看过的电影 val ratingExist = ConnHelper.mongoClient(mongoConfig.db)(MONGODB_RATING_COLLECTION) .find(MongoDBObject("uid" -> uid)) .toArray .map { item => item.get("mid").toString.toInt } // 3. 把看过的过滤,得到输出列表 allSimMovies.filter(x => !ratingExist.contains(x._1)) .sortWith(_._2 > _._2) .take(num) .map(x => x._1)}
3.3 电影推荐优先级计算
对于侯选电影集合 s i m H a s h simHash simHash和 u s e r I d userId userId的最近 K K K个评分recentRatings,算法代码内容如下:
def computeMovieScores(candidateMovies: Array[Int],userRecentlyRatings: Array[(Int, Double)],simMovies: scala.collection.Map[Int, scala.collection.immutable.Map[Int, Double]]): Array[(Int, Double)] = { // 定义一个ArrayBuffer,用于保存每一个备选电影的基础得分 val scores = scala.collection.mutable.ArrayBuffer[(Int, Double)]() // 定义一个HashMap,保存每一个备选电影的增强减弱因子 val increMap = scala.collection.mutable.HashMap[Int, Int]() val decreMap = scala.collection.mutable.HashMap[Int, Int]() for (candidateMovie <- candidateMovies; userRecentlyRating <- userRecentlyRatings) { // 拿到备选电影和最近评分电影的相似度 val simScore = getMoviesSimScore(candidateMovie, userRecentlyRating._1, simMovies) if (simScore > 0.7) { // 计算备选电影的基础推荐得分 scores += ((candidateMovie, simScore * userRecentlyRating._2)) if (userRecentlyRating._2 > 3) { increMap(candidateMovie) = increMap.getOrDefault(candidateMovie, 0) + 1 } else { decreMap(candidateMovie) = decreMap.getOrDefault(candidateMovie, 0) + 1 } } } // 根据备选电影的mid做groupby,根据公式去求最后的推荐评分 scores.groupBy(_._1).map { // groupBy之后得到的数据 Map( mid -> ArrayBuffer[(mid, score)] ) case (mid, scoreList) => (mid, scoreList.map(_._2).sum / scoreList.length + log(increMap.getOrDefault(mid, 1)) - log(decreMap.getOrDefault(mid, 1))) }.toArray.sortWith(_._2 > _._2)}
其中,getMoviesimScore是取侯选电影和已评分电影的相似度,代码如下:
// 获取两个电影之间的相似度def getMoviesSimScore(mid1: Int, mid2: Int, simMovies: scala.collection.Map[Int, scala.collection.immutable.Map[Int, Double]]): Double = { simMovies.get(mid1) match { case Some(sims) => sims.get(mid2) match { case Some(score) => score case None => 0.0 } case None => 0.0 }}
而log是对数运算,这里实现为取10的对数(常用对数):
// 求一个数的对数,利用换底公式,底数默认为10def log(m: Int): Double = { val N = 10 math.log(m) / math.log(N)}
3.4 将结果保存到 M o n g o D B MongoDB MongoDB
saveRecsToMongoDB实现了结果的保存:
def saveDataToMongoDB(uid: Int, streamRecs: Array[(Int, Double)])(implicit mongoConfig: MongoConfig): Unit = { // 定义到StreamRecs表的连接 val streamRecsCollection = ConnHelper.mongoClient(mongoConfig.db)(MONGODB_STREAM_RECS_COLLECTION) // 如果表中已有uid对应的数据,则删除 streamRecsCollection.findAndRemove(MongoDBObject("uid" -> uid)) // 将streamRecs数据存入表中 streamRecsCollection.insert(MongoDBObject("uid" -> uid, "recs" -> streamRecs.map(x => MongoDBObject("mid" -> x._1, "score" -> x._2))))}
3.5 更新实时推荐结果
当计算出候选电影的推荐优先级的数组updatedRecommends后,这个数组将被发送到Web后台服务器,与后台服务器上 u s e r I d userId userId的上次实时推荐结果recentRecommends进行合并、替换并选出优先级 E E E前 K K K大的电影作为本次新的实时推荐。具体而言:
- 合并:将updatedRecommends 与recentRecommends并集合成为一个新的数组
- 替换(去重):当updatedRecommends 与recentRecommends有重复的电影m i d mid mid,recentRecommends中m i d mid mid的推荐优先级由于是上次实时推荐的结果,于是将其作废,并替换成代表了更新后的updatedRecommends的m i d mid mid的推荐优先级
- 选取T o p K TopK TopK:在合并、替换后的数组上,根据每个MOVIE的推荐优先级,选择出前K K K大的电影,作为本次实时推荐的最终结果
四、实时系统联调
系统实时推荐的数据流向是:业务系统->日志->Flume日志采集->Kafka Streaming数据清洗和预处理->Spark Streaming流式计算。在完成实时推荐服务的代码后,应该与其他工具进行连调测试,确保系统正常运行
4.1 启动实时系统的基本组件
启动实时推荐系统StreamingRecommender以及MongoDB、Redis
4.2 启动 Z o o k e e p e r Zookeeper Zookeeper
bin/zkServer.sh start
4.3 启动 K a f k a Kafka Kafka
bin/kafka-server-start.sh -daemon ./config/server.properties
4.4 启动 K a f k a S t r e a m i n g Kafka Streaming KafkaStreaming程序
在recommender下新建module,KafkaStreaming,主要用来做日志数据的预处理,过滤出需要的内容。pom.xml文件需要引入依赖;
<dependencies> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-streams</artifactId> <version>0.10.2.1</version> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>0.10.2.1</version> </dependency></dependencies><build> <finalName>kafkastream</finalName> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-assembly-plugin</artifactId> <configuration> <archive> <manifest> <mainClass>com.IronmanJay.kafkastream.Application</mainClass> </manifest> </archive> <descriptorRefs> <descriptorRef>jar-with-dependencies</descriptorRef> </descriptorRefs> </configuration> <executions> <execution> <id>make-assembly</id> <phase>package</phase> <goals> <goal>single</goal> </goals> </execution> </executions> </plugin> </plugins></build>
在src/main/java下新建java类com.IronmanJay.kafkastreaming.Application
package com.IronmanJay.kafkastream;import org.apache.kafka.streams.KafkaStreams;import org.apache.kafka.streams.StreamsConfig;import org.apache.kafka.streams.processor.TopologyBuilder;import org.apache.kafka.streams.processor.WallclockTimestampExtractor;import java.util.Properties;public class Application { public static void main(String[] args) { String brokers = "linux:9092"; String zookeepers = "linux:2181"; // 输入和输出的topic String from = "log"; String to = "recommender"; // 定义kafka streaming的配置 Properties settings = new Properties(); settings.put(StreamsConfig.APPLICATION_ID_CONFIG, "logFilter"); settings.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, brokers); settings.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, zookeepers); settings.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class); // 创建 kafka stream 配置对象 StreamsConfig config = new StreamsConfig(settings); // 创建一个拓扑建构器 TopologyBuilder builder = new TopologyBuilder(); // 定义流处理的拓扑结构 builder.addSource("SOURCE", from) .addProcessor("PROCESSOR", () -> new LogProcessor(), "SOURCE") .addSink("SINK", to, "PROCESSOR"); KafkaStreams streams = new KafkaStreams(builder, config); streams.start(); System.out.println("Kafka stream started!>>>>>>>>>>>"); }}
这个程序会将topic为"log"的信息流获取来做处理,并以"recommender"为新的topic转发出去,流处理程序LogProcess.java如下所示:
package com.IronmanJay.kafkastream;import org.apache.kafka.streams.processor.Processor;import org.apache.kafka.streams.processor.ProcessorContext;public class LogProcessor implements Processor<byte[], byte[]> { private ProcessorContext context; @Override public void init(ProcessorContext processorContext) { this.context = processorContext; } @Override public void process(byte[] dummy, byte[] line) { // 把收集到的日志信息用string表示 String input = new String(line); // 根据前缀MOVIE_RATING_PREFIX:从日志信息中提取评分数据 if (input.contains("MOVIE_RATING_PREFIX:")) { System.out.println("movie rating data coming!>>>>>>>>>>>" + input); input = input.split("MOVIE_RATING_PREFIX:")[1].trim(); context.forward("logProcessor".getBytes(), input.getBytes()); } } @Override public void punctuate(long l) { } @Override public void close() { }}
完成代码后,启动Application
4.5 配置并启动 F l u m e Flume Flume
在 F l u m e Flume Flume的conf下新建log-kafka.properties,对 F l u m e Flume Flume连接Kafka做配置:
agent.sources = exectailagent.channels = memoryChannelagent.sinks = kafkasink# For each one of the sources, the type is definedagent.sources.exectail.type = exec# 下面这个路径是需要收集日志的绝对路径,改为自己的日志目录agent.sources.exectail.command = tail –f 自己的日志目录路径agent.sources.exectail.interceptors=i1agent.sources.exectail.interceptors.i1.type=regex_filter# 定义日志过滤前缀的正则agent.sources.exectail.interceptors.i1.regex=.+MOVIE_RATING_PREFIX.+# The channel can be defined as follows.agent.sources.exectail.channels = memoryChannel# Each sink's type must be definedagent.sinks.kafkasink.type = org.apache.flume.sink.kafka.KafkaSinkagent.sinks.kafkasink.kafka.topic = logagent.sinks.kafkasink.kafka.bootstrap.servers = linux:9092agent.sinks.kafkasink.kafka.producer.acks = 1agent.sinks.kafkasink.kafka.flumeBatchSize = 20#Specify the channel the sink should useagent.sinks.kafkasink.channel = memoryChannel# Each channel's type is defined.agent.channels.memoryChannel.type = memory# Other config values specific to each type of channel(sink or source)# can be defined as well# In this case, it specifies the capacity of the memory channelagent.channels.memoryChannel.capacity = 10000
配置好后,启动 F l u m e Flume Flume:
./bin/flume-ng agent -c ./conf/ -f ./conf/log-kafka.properties -n agent -Dflume.root.logger=INFO,console
4.6 启动业务系统后台
将业务代码加入系统中。注意在businessServer/src/main/resources/下的log4j.properties中,log4j.appender.file.File的值应该替换为自己的日志目录,与 F l u m e Flume Flume中的配置应该相同
启动业务系统后台,访问linux:8088/index.html,点击某个电影进行评分,查看实时推荐列表是否会发生变化
总结
这篇博文是整个系列博文的重点,又是难点,没想到洋洋洒洒写了大约2万字,希望读者可以认认真真一步一步的操作,当系统做到这里的时候,基本已经成形了,后面我将会给大家讲解系统的后台与前端服务功能的实现,讲解框架与实现原理,那就,下篇博文见啦!