12.3 通过RDD分析各种类型的最喜爱电影TopN及性能优化技巧

通过RDD分析大数据电影点评系统各种类型的最喜爱电影TopN。本节分析最受男性喜爱的电影Top10和最受女性喜爱的电影Top10。

评级文件ratings.dat的格式描述如下。

1.  UserID::MovieID::Rating::Timestamp
2.  用户ID、电影ID、评分数据、时间戳

用户文件users.dat的格式描述如下。

1.  UserID::Gender::Age::Occupation::Zip-code
2.  用户ID、性别、年龄、职业、邮编代码

单单从评分数据ratings中无法计算出最受男性或者女性喜爱的电影Top10,因为该RDD中没有性别Gender信息,如果需要使用性别Gender信息进行性别Gender的分类,此时一定需要聚合。当然,我们力求聚合的使用是mapjoin(分布式计算的杀手是数据倾斜,Mapper端的Join是一定不会数据倾斜的),这里可否使用mapjoin呢?不可以,因为用户的数据非常多!所以,这里要使用正常的Join,此处的场景不会数据倾斜,因为用户一般都均匀地分布。

最受男性喜爱的电影Top10和最受女性喜爱的电影Top10分析需注意以下事项。

(1)因为要再次使用电影数据的RDD,所以复用了前面Cache的ratings数据。

(2)在根据性别过滤出数据后,关于TopN部分的代码直接复用前面的代码就行了。

(3)要进行Join,需要key-value。

(4)在进行Join的时候,通过take等方法注意Join后的数据格式(3319,((3319, 50, 4.5),F))。

(5)使用数据冗余来实现代码复用或者更高效地运行,这是企业级项目的一个非常重要的技巧!

大数据电影点评系统中,统计最受男性喜爱的电影Top10和最受女性喜爱的电影Top10,我们先分别过滤出男性、女性相关的数据,具体实现思路如下:

(1)对ratings中的(用户ID,电影ID,评分)元组进行map转换,格式化成Key-Value,即(用户ID,(用户ID,电影ID,评分))。

(2)对usersRDD中的每行数据按"::"分隔符分割,然后进行map转换,格式化成Key-Value,即(用户ID,性别)。

(3)将(用户ID,(用户ID,电影ID,评分))与(用户ID,性别)进行Join生成新的genderRatings RDD。格式为:(用户ID,((用户ID,电影ID,评分),性别)),性别,并且进行Cache缓存。

(4)对genderRatings RDD进行过滤转换,从元组(x._1用户ID,(x._2._1(用户ID,电影ID,评分),x._2._2性别))过滤出x._2._2性别等于男性的数据。然后进行map转换为x._2._1,即转换成(用户ID,电影ID,评分)格式,生成maleFilteredRatings。

(5)对genderRatings RDD进行过滤转换,从元组(x._1用户ID,(x._2._1(用户ID,电影ID,评分),x._2._2性别))过滤出x._2._2性别等于女性的数据。然后进行map转换为x._2._1,即转换成(用户ID,电影ID,评分)格式,生成femaleFilteredRatings。

从大数据电影点评系统中过滤男性、女性相关的数据的代码如下。

1.  val male = "M"
2.  val female = "F"
3.  val genderRatings = ratings.map(x => (x._1, (x._1, x._2, x._3))).join(
4.    usersRDD.map(_.split("::")).map(x => (x(0), x(1)))).cache()
5.  genderRatings.take(2).foreach(println)
6.  val maleFilteredRatings: RDD[(String, String, String)] = gender
    Ratings.filter(x => x._2._2.equals("M")).map(x => x._2._1)
7.  val femaleFilteredRatings = genderRatings.filter(x => x._2._2.equals
    ("F")).map(x => x._2._1)

在IDEA中运行代码,打印出genderRatings的数据,取10个数据,格式为:(用户ID,((用户ID,电影ID,评分),性别)),结果如下。

1.  (3319,((3319,32,5),F))
2.  (3319,((3319,50,4.5),F))
3.  (3319,((3319,163,4.5),F))
4.  (3319,((3319,180,5),F))
5.  (3319,((3319,296,5),F))
6.  (3319,((3319,318,5),F))
7.  (3319,((3319,405,4),F))
8.  (3319,((3319,914,4.5),F))
9.  (3319,((3319,1088,4),F))
10. (3319,((3319,1136,5),F))

电影点评系统用户行为分析,统计所有电影中最受男性喜爱的电影Top10,具体实现思路如下:

(1)将性别为男的用户过滤以后的数据(用户ID,电影ID,评分)进行map转换,格式化成为Key-Value的方式,即(电影ID,(评分,1))。

(2)使用reduceByKey算子对Value值进行汇聚转换,对两个具有相同Key值,而Value不同的元组,如(电影ID,(x.评分,x.1)),(电影ID,(y.评分,y.1)),计算得出(x的评分+y的评分,x的计数+y的计数),转换以后Key是电影ID,Value是(总的评分,总的点评人数),格式化成为Key-Value,即(电影ID,(总评分,总点评人数))。

(3)reduceByKey算子执行完毕,接下来进行map转换操作,交换Key-Value值,并且计算出电影平均分=总评分/总点评人数,即将(电影ID,(总评分,总点评人数))转换成((总评分/总点评人数),电影ID),然后使用sortByKey(false)算子按电影平均分降序排列。

(4)再次进行Key和Value的交换,打印输出。使用map转换函数将((总评分/总点评人数),电影ID)进行交换,转换为(电影ID,(总评分/总点评人数)),再通过take(10)算子获取所有电影中最受男性喜爱的电影Top10,进行打印输出。

在所有电影中分析最受男性喜爱的电影Top10的代码如下。

1.  println("所有电影中最受男性喜爱的电影Top10:")
2.      maleFilteredRatings.map(x=>(x._2,(x._3.toDouble, 1)))//格式化成为Key-Value
3.        .reduceByKey((x, y) => (x._1 + y._1,x._2 + y._2))
          //对Value进行reduce操作,分别得出每部电影的总的评分和总的点评人数
4.        .map(x => (x._2._1.toDouble / x._2._2, x._1))  //求出电影平均分
5.        .sortByKey(false) //降序排列
6.        .map(x => (x._2, x._1))
7.        .take(10)              //取Top10
8.        .foreach(println) //打印到控制台

在IDEA中运行代码,结果如下。

1.  所有电影中最受男性喜爱的电影Top10:
2.  (855,5.0)
3.  (6075,5.0)
4.  (1166,5.0)
5.  (3641,5.0)
6.  (1045,5.0)
7.  (4136,5.0)
8.  (2538,5.0)
9.  (7227,5.0)
10. (8484,5.0)
11. (5599,5.0)

同样地,在电影点评系统用户行为分析中,我们可以统计所有电影中最受女性喜爱的电影Top10,具体实现思路和最受男性喜爱的电影Top10类似,这里不再赘述。

从所有电影中分析最受女性喜爱的电影Top10的代码如下。

1.  println("所有电影中最受女性喜爱的电影Top10:")
2.   femaleFilteredRatings.map(x=>(x._2,(x._3.toDouble,1)))//格式化成为Key-Value
3.     .reduceByKey((x, y) => (x._1 + y._1,x._2 + y._2))
       //对Value进行reduce操作,分别得出每部电影的总的评分和总的点评人数
4.     .map(x => (x._2._1.toDouble / x._2._2, x._1))  //求出电影平均分
5.     .sortByKey(false) //降序排列
6.     .map(x => (x._2, x._1))
7.     .take(10)             //取Top10
8.     .foreach(println) //打印到控制台

在IDEA中运行代码,结果如下。

1.  所有电影中最受女性喜爱的电影Top10:
2.  [Stage 43:=================================>  7 + 1) / 8](789,5.0)
3.  (855,5.0)
4.  (32153,5.0)
5.  (4763,5.0)
6.  (26246,5.0)
7.  (2332,5.0)
8.  (503,5.0)
9.  (4925,5.0)
10. (8767,5.0)
11. (44657,5.0)