- Spark大数据商业实战三部曲:内核解密|商业案例|性能调优
- 王家林
- 1449字
- 2021-03-30 21:56:05
12.2 通过RDD实现电影流行度分析
本节统计所有电影中平均得分最高(口碑最好)的电影及观看人数最多(流行度最高)的电影。
所有电影中平均得分最高的Top10电影实现思路:如果想算总的评分,一般肯定需要reduceByKey操作或者aggregateByKey操作。
评级文件ratings.dat的格式描述如下。
1. UserID::MovieID::Rating::Timestamp 2. 用户ID、电影ID、评分数据、时间戳
第一步:把数据变成Key-Value,大家想一下在这里什么是Key,什么是Value?把MovieID设置为Key,把Rating设置为Value。具体实现过程:将ratingsRDD中的每行数据按"::"分隔符进行分割,然后map格式化为(用户ID,电影ID,评分)元组;接下来对ratings进行map转换,取ratings元组的第2个元素"电影ID"作为Key,(第3个元素即评分,1)元组值作为Value,格式化成为Key-Value的方式,即(电影ID,(评分,1))。
第二步:通过reduceByKey操作或者aggregateByKey实现聚合,然后呢?具体实现过程:对两个具有相同Key,而Value不同的元组,如(电影ID,(x.评分,x.1)),(电影ID,(y.评分,y.1)),我们使用reduceByKey算子对Value值进行汇聚转换,计算得出(x的评分+y的评分,x的计数+y的计数),转换以后Key是电影ID,Value是(总的评分,总的点评人数),格式化为Key-Value,即(电影ID,(总评分,总点评人数))。
第三步:排序,如何做?进行Key和Value的交换。上一步reduceByKey算子执行完毕,接下来进行map转换操作,交换Key-Value值,并且计算出电影平均分=总评分/总点评人数,即将(电影ID,(总评分,总点评人数))转换成((总评分/总点评人数),电影ID),然后使用sortByKey(false)算子按电影平均分降序排列,再通过take(10)算子获取所有电影中平均得分最高的Top10,打印输出。
所有电影中电影粉丝或者观看人数最多的电影实现思路:
第一步:把数据变成Key-Value:取ratings元组的第2个元素电影ID作为Key,计数1次作为Value,格式化成为Key-Value,即(电影ID,1)。
第二步:通过reduceByKey操作实现聚合:对相同Key的Value值进行累加。生成Key-Value,即(电影ID,总次数)。
第三步:排序,进行Key和Value的交换。上一步reduceByKey算子执行完毕,然后进行map转换操作,交换Key-Value值,即将(电影ID,总次数)转换成(总次数,电影ID),然后使用sortByKey(false)算子按总次数降序排列。
第四步:再次进行Key和Value的交换,打印输出。我们使用map转换函数将(总次数,电影ID)进行交换,转换为(电影ID,总次数),再通过take(10)算子获取所有电影中粉丝或者观看人数最多的电影Top10,打印输出。
大数据电影点评系统中,电影流行度分析须注意以下事项。
(1)转换数据格式的时候一般都会使用map操作,有时转换可能特别复杂,需要在map方法中调用第三方jar或者so库。
(2)RDD从文件中提取的数据成员默认都是String类型,需要根据实际需要进行转换类型。
(3)RDD如果要重复使用,一般都会进行Cache操作。
(4)重磅注意事项,RDD的Cache操作之后不能直接再跟其他的算子操作,否则在一些版本中Cache不生效。
电影点评系统用户行为分析,统计所有电影中平均得分最高(口碑最好)的电影以及电影粉丝或者观看人数最多的电影的代码如下。
1. println("所有电影中平均得分最高(口碑最好)的电影:") 2. val ratings= ratingsRDD.map(_.split("::")).map(x => (x(0), x(1), x(2))).cache() //格式化出电影ID和评分 3. ratings.map(x => (x._2, (x._3.toDouble, 1)))//格式化为Key-Value的方式 4. .reduceByKey((x, y) => (x._1 + y._1,x._2 + y._2)) //对Value进行reduce操作,分别得出每部电影的总的评分和总的点评人数 5. .map(x => (x._2._1.toDouble / x._2._2, x._1)) //求出电影平均分 6. .sortByKey(false) //降序排列 7. .take(10) //取Top10 8. .foreach(println) //打印到控制台 9. 10. /** 11. *上面的功能计算的是口碑最好的电影,接下来分析粉丝或者观看人数最多的电影 12. */ 13. println("所有电影中粉丝或者观看人数最多的电影:") 14. ratings.map(x => (x._2, 1)).reduceByKey(_+_).map(x => (x._2, x._1)). sortByKey(false) 15. .map(x => (x._2, x._1)).take(10).foreach(println)
在IDEA中运行代码,结果如下。
1. 所有电影中平均得分最高(口碑最好)的电影: 2. [Stage 17:=============================> (4 + 4) / 8](5.0,33264) 3. (5.0,64275) 4. (5.0,42783) 5. (5.0,53355) 6. (5.0,51209) 7. (4.75,26073) 8. (4.75,26048) 9. (4.75,65001) 10. (4.75,5194) 11. (4.75,4454) 12. 所有电影中粉丝或者观看人数最多的电影: 13. (296,34864) 14. (356,34457) 15. (593,33668) 16. (480,32631) 17. (318,31126) 18. (110,29154) 19. (457,28951) 20. (589,28948) 21. (260,28566) 22. (150,27035)