12.2 通过RDD实现电影流行度分析

本节统计所有电影中平均得分最高(口碑最好)的电影及观看人数最多(流行度最高)的电影。

所有电影中平均得分最高的Top10电影实现思路:如果想算总的评分,一般肯定需要reduceByKey操作或者aggregateByKey操作。

评级文件ratings.dat的格式描述如下。

1.  UserID::MovieID::Rating::Timestamp
2.  用户ID、电影ID、评分数据、时间戳

第一步:把数据变成Key-Value,大家想一下在这里什么是Key,什么是Value?把MovieID设置为Key,把Rating设置为Value。具体实现过程:将ratingsRDD中的每行数据按"::"分隔符进行分割,然后map格式化为(用户ID,电影ID,评分)元组;接下来对ratings进行map转换,取ratings元组的第2个元素"电影ID"作为Key,(第3个元素即评分,1)元组值作为Value,格式化成为Key-Value的方式,即(电影ID,(评分,1))。

第二步:通过reduceByKey操作或者aggregateByKey实现聚合,然后呢?具体实现过程:对两个具有相同Key,而Value不同的元组,如(电影ID,(x.评分,x.1)),(电影ID,(y.评分,y.1)),我们使用reduceByKey算子对Value值进行汇聚转换,计算得出(x的评分+y的评分,x的计数+y的计数),转换以后Key是电影ID,Value是(总的评分,总的点评人数),格式化为Key-Value,即(电影ID,(总评分,总点评人数))。

第三步:排序,如何做?进行Key和Value的交换。上一步reduceByKey算子执行完毕,接下来进行map转换操作,交换Key-Value值,并且计算出电影平均分=总评分/总点评人数,即将(电影ID,(总评分,总点评人数))转换成((总评分/总点评人数),电影ID),然后使用sortByKey(false)算子按电影平均分降序排列,再通过take(10)算子获取所有电影中平均得分最高的Top10,打印输出。

所有电影中电影粉丝或者观看人数最多的电影实现思路:

第一步:把数据变成Key-Value:取ratings元组的第2个元素电影ID作为Key,计数1次作为Value,格式化成为Key-Value,即(电影ID,1)。

第二步:通过reduceByKey操作实现聚合:对相同Key的Value值进行累加。生成Key-Value,即(电影ID,总次数)。

第三步:排序,进行Key和Value的交换。上一步reduceByKey算子执行完毕,然后进行map转换操作,交换Key-Value值,即将(电影ID,总次数)转换成(总次数,电影ID),然后使用sortByKey(false)算子按总次数降序排列。

第四步:再次进行Key和Value的交换,打印输出。我们使用map转换函数将(总次数,电影ID)进行交换,转换为(电影ID,总次数),再通过take(10)算子获取所有电影中粉丝或者观看人数最多的电影Top10,打印输出。

大数据电影点评系统中,电影流行度分析须注意以下事项。

(1)转换数据格式的时候一般都会使用map操作,有时转换可能特别复杂,需要在map方法中调用第三方jar或者so库。

(2)RDD从文件中提取的数据成员默认都是String类型,需要根据实际需要进行转换类型。

(3)RDD如果要重复使用,一般都会进行Cache操作。

(4)重磅注意事项,RDD的Cache操作之后不能直接再跟其他的算子操作,否则在一些版本中Cache不生效。

电影点评系统用户行为分析,统计所有电影中平均得分最高(口碑最好)的电影以及电影粉丝或者观看人数最多的电影的代码如下。

1.   println("所有电影中平均得分最高(口碑最好)的电影:")
2.   val ratings= ratingsRDD.map(_.split("::")).map(x => (x(0), x(1),
     x(2))).cache()  //格式化出电影ID和评分
3.   ratings.map(x => (x._2, (x._3.toDouble, 1)))//格式化为Key-Value的方式
4.     .reduceByKey((x, y) => (x._1 + y._1,x._2 + y._2))
       //对Value进行reduce操作,分别得出每部电影的总的评分和总的点评人数
5.     .map(x => (x._2._1.toDouble / x._2._2, x._1))  //求出电影平均分
6.     .sortByKey(false) //降序排列
7.     .take(10)              //取Top10
8.     .foreach(println) //打印到控制台
9.
10.  /**
11.    *上面的功能计算的是口碑最好的电影,接下来分析粉丝或者观看人数最多的电影
12.    */
13.  println("所有电影中粉丝或者观看人数最多的电影:")
14.  ratings.map(x => (x._2, 1)).reduceByKey(_+_).map(x => (x._2, x._1)).
     sortByKey(false)
15.    .map(x => (x._2, x._1)).take(10).foreach(println)

在IDEA中运行代码,结果如下。

1.  所有电影中平均得分最高(口碑最好)的电影:
2.  [Stage 17:=============================>   (4 + 4) / 8](5.0,33264)
3.  (5.0,64275)
4.  (5.0,42783)
5.  (5.0,53355)
6.  (5.0,51209)
7.  (4.75,26073)
8.  (4.75,26048)
9.  (4.75,65001)
10. (4.75,5194)
11. (4.75,4454)
12. 所有电影中粉丝或者观看人数最多的电影:
13. (296,34864)
14. (356,34457)
15. (593,33668)
16. (480,32631)
17. (318,31126)
18. (110,29154)
19. (457,28951)
20. (589,28948)
21. (260,28566)
22. (150,27035)