1.1 通过RDD实战电影点评系统入门及源码阅读

日常的数据来源有很多渠道,如网络爬虫、网页埋点、系统日志等。下面的案例中使用的是用户观看电影和点评电影的行为数据,数据来源于网络上的公开数据,共有3个数据文件:uers.dat、ratings.dat和movies.dat。

其中,uers.dat的格式如下:

1.  UserID::Gender::Age::Occupation::Zip-code

这个文件里共有6040个用户的信息,每行中用“::”隔开的详细信息包括ID、性别(F、M分别表示女性、男性)、年龄(使用7个年龄段标记)、职业和邮编。

ratings.dat的格式如下:

1.  UserID::MovieID::Rating::Timestamp

这个文件记录的是评分信息,即用户ID、电影ID、评分(满分是5分)和时间戳。

movies.dat的格式如下:

1.  MovieID::Title::Genres

这个文件记录的是电影信息,即电影ID、电影名称和电影类型。

1.1.1 Spark核心概念图解

进入到案例实战前,首先来看几个至关重要的概念,这些概念承载着Spark集群运转和程序运行的重要使命。Spark运行架构图如图1-1所示。

Master(图1-1中的Cluster Manager):就像Hadoop有NameNode和DataNode一样,Spark有Master和Worker。Master是集群的领导者,负责管理集群资源,接收Client提交的作业,以及向Worker发送命令。

Worker(图1-1中的Worker Node):集群中的Worker,执行Master发送的指令,来具体分配资源,并在这些资源中执行任务。

Driver:一个Spark作业运行时会启动一个Driver进程,也是作业的主进程,负责作业的解析、生成Stage,并调度Task到Executor上。

Executor:真正执行作业的地方。Executor分布在集群中的Worker上,每个Executor接收Driver的命令来加载和运行Task,一个Executor可以执行一到多个Task。

图1-1 Spark运行架构图

SparkContext:是程序运行调度的核心,由高层调度器DAGScheduler划分程序的每个阶段,底层调度器TaskScheduler划分每个阶段的具体任务。SchedulerBackend管理整个集群中为正在运行的程序分配的计算资源Executor。

DAGScheduler:负责高层调度,划分stage并生成程序运行的有向无环图。

TaskScheduler:负责具体stage内部的底层调度,具体task的调度、容错等。

Job:(正在执行的叫ActiveJob)是Top-level的工作单位,每个Action算子都会触发一次Job,一个Job可能包含一个或多个Stage。

Stage:是用来计算中间结果的Tasksets。Tasksets中的Task逻辑对于同一个RDD内的不同partition都一样。Stage在Shuffle的地方产生,此时下一个Stage要用到上一个Stage的全部数据,所以要等到上一个Stage全部执行完才能开始。Stage有两种:ShuffleMapStage和ResultStage,除了最后一个Stage是ResultStage外,其他Stage都是ShuffleMapStage。ShuffleMapStage会产生中间结果,以文件的方式保存在集群里,Stage经常被不同的Job共享,前提是这些Job重用了同一个RDD。

Task:任务执行的工作单位,每个Task会被发送到一个节点上,每个Task对应RDD的一个partition。

RDD:是不可变的、Lazy级别的、粗粒度的(数据集级别的而不是单个数据级别的)数据集合,包含了一个或多个数据分片,即partition。

另外,Spark程序中有两种级别的算子:Transformation和Action。Transformation算子会由DAGScheduler划分到pipeline中,是Lazy级别的不会触发任务的执行;Action算子会触发Job来执行pipeline中的运算。

介绍完上面的关键概念,下面开始进入到程序编写阶段。

首先写好Spark程序的固定框架,以便于在处理和分析数据的时候专注于业务逻辑本身。

创建一个Scala的object类,在main方法中配置SparkConf和SparkContext,这里指定程序在本地运行,并且把程序名字设置为“RDD_Movie_Users_Analyzer”。

RDD_Movie_Users_Analyzer代码如下。

1.   val conf = new SparkConf().setMaster("local[*]")
2.  .setAppName("RDD_Movie_Users_Analyzer")
3.   /**
    *Spark 2.0    引入 SparkSession    封装了  SparkContext    和 SQLContext,并且会在
    *builder的getOrCreate方法中判断是否有符合要求的SparkSession存在,有则使用,
    *没有则进行创建
    */
4.  val spark = SparkSession.builder.config(conf).getOrCreate()
5.  //获取SparkSession的SparkContext
6.  val sc = spark.sparkContext
7.  //把Spark程序运行时的日志设置为warn级别,以方便查看运行结果
8.  sc.setLogLevel("warn")
9.  //把用到的数据加载进来转换为RDD,此时使用sc.textFile并不会读取文件,而是标记了有
    //这个操作,遇到Action级别算子时才会真正去读取文件
10. val usersRDD = sc.textFile(dataPath + "users.dat")
11. val moviesRDD = sc.textFile(dataPath + "movies.dat")
12. val ratingsRDD = sc.textFile(dataPath + "ratings.dat")
13. /**具体数据处理的业务逻辑*/
14. //最后关闭SparkSession
15. spark.stop

1.1.2 通过RDD实战电影点评系统案例

首先我们来写一个案例计算,并打印出所有电影中评分最高的前10个电影名和平均评分。

第一步:从ratingsRDD中取出MovieID和rating,从moviesRDD中取出MovieID和Name,如果后面的代码重复使用这些数据,则可以把它们缓存起来。首先把使用map算子上面的RDD中的每一个元素(即文件中的每一行)以“::”为分隔符进行拆分,然后再使用map算子从拆分后得到的数组中取出需要用到的元素,并把得到的RDD缓存起来。

1.  println("所有电影中平均得分最高(口碑最好)的电影:")
2.  val movieInfo = moviesRDD.map(_.split("::")).map(x => (x(0), x(1))).cache()
3.  val ratings = ratingsRDD.map(_.split("::"))
4.  .map(x => (x(0), x(1), x(2))).cache()

第二步:从ratings的数据中使用map算子获取到形如(movieID,(rating,1))格式的RDD,然后使用reduceByKey把每个电影的总评分以及点评人数算出来。

1.  val moviesAndRatings = ratings.map(x => (x._2, (x._3.toDouble, 1)))
2.        .reduceByKey((x, y) => (x._1 + y._1, x._2 + y._2))

此时得到的RDD格式为(movieID,(Sum(ratings),Count(ratings)))。

第三步:把每个电影的Sum(ratings)和Count(ratings)相除,得到包含了电影ID和平均评分的RDD:

1.  val avgRatings= moviesAndRatings.map(x => (x._1,x._2._1.toDouble / x._2._2))

第四步:把avgRatings与movieInfo通过关键字(key)连接到一起,得到形如(movieID, (MovieName,AvgRating))的RDD,然后格式化为(AvgRating,MovieName),并按照key(也就是平均评分)降序排列,最终取出前10个并打印出来。

1.  avgRatings.join(movieInfo).map(item => (item._2._1,item._2._2))
2.        .sortByKey(false).take(10)
3.        .foreach(record => println(record._2+"评分为:"+record._1))

评分最高电影运行结果如图1-2所示。

图1-2 评分最高电影运行结果

接下来我们来看另外一个功能的实现:分析最受男性喜爱的电影Top10和最受女性喜爱的电影Top10。

首先来分析一下:单从ratings中无法计算出最受男性或者女性喜爱的电影Top10,因为该RDD中没有Gender信息,如果需要使用Gender信息进行Gender的分类,此时一定需要聚合。当然,我们力求聚合使用的是mapjoin(分布式计算的一大痛点是数据倾斜,map端的join一定不会数据倾斜),这里是否可使用mapjoin?不可以,因为map端的join是使用broadcast把相对小得多的变量广播出去,这样可以减少一次shuffle,这里,用户的数据非常多,所以要使用正常的join。

1.   Val usersGender = usersRDD.map(_.split("::")).map(x => (x(0), x(1)))
2.  val genderRatings = ratings.map(x => (x._1, (x._1, x._2, x._3)))
3.  .join(usersGender).cache()
4.  genderRatings.take(10).foreach(println)

使用join连接ratings和users之后,对分别过滤出男性和女性的记录进行处理:

1.  val maleFilteredRatings = genderRatings
2.  .filter(x => x._2._2.equals("M")).map(x => x._2._1)
3.  val femaleFilteredRatings = genderRatings
4.  .filter(x => x._2._2.equals("F")).map(x => x._2._1)

接下来对两个RDD进行处理,处理逻辑和上面的案例相同,最终打印出来的结果分别如图1-3和图1-4所示:

所有电影中最受男性喜爱的电影Top10业务代码如下。所有电影中最受女性喜爱的电影Top10业务代码如下。

1.   println("所有电影中最受男性喜爱的电影Top10:")
2.  maleFilteredRatings.map(x => (x._2, (x._3.toDouble, 1)))
3.    .reduceByKey((x, y) => (x._1 + y._1, x._2 + y._2))
4.  .map(x => (x._1,x._2._1.toDouble / x._2._2))
5.  .join(movieInfo)
6.  .map(item => (item._2._1,item._2._2))
7.  .sortByKey(false) .take(10)
8.  .foreach(record => println(record._2+"评分为:"+record._1))

图1-3 最受男性喜爱的电影运行结果

1.   println("所有电影中最受女性喜爱的电影Top10:")
2.  femaleFilteredRatings.map(x => (x._2, (x._3.toDouble, 1)))
3.    .reduceByKey((x, y) => (x._1 + y._1, x._2 + y._2))
4.  .map(x => (x._1,x._2._1.toDouble / x._2._2)) .join(movieInfo)
5.  .map(item => (item._2._1,item._2._2)).sortByKey(false) .take(10)
6.  .foreach(record => println(record._2+"评分为:"+record._1))

图1-4 最受女性喜爱的电影运行结果

在现实业务场景中,二次排序非常重要,并且经常遇到。下面来模拟一下这些场景,实现对电影评分数据进行二次排序,以Timestamp和Rating两个维度降序排列,值得一提的是,Java版本的二次排序代码非常烦琐,而使用Scala实现就会很简捷,首先我们需要一个继承自Ordered和Serializable的类。

1.   class SecondarySortKey(val first: Double, val second: Double)
2.  extends Ordered[SecondarySortKey] with Serializable {
3.  //在这个类中重写compare方法
4.  override def compare(other: SecondarySortKey): Int = {
5.  //既然是二次排序,那么首先要判断第一个排序字段是否相等,如果不相等,就直接排序
6.  if (this.first - other.first != 0) {
7.        (this.first - other.first).toInt
8.      } else {
9.       //如果第一个字段相等,则比较第二个字段,若想实现多次排序,也可以按照这个模式继
         //续比较下去
10.       if (this.second - other.second > 0) {
11.         Math.ceil(this.second - other.second).toInt
12.       } else if (this.second - other.second < 0) {
13.         Math.floor(this.second - other.second).toInt
14.       } else {
15.         (this.second - other.second).toInt
16.       }
17.     }
18.   }
19. }

然后再把RDD的每条记录里想要排序的字段封装到上面定义的类中作为key,把该条记录整体作为value。

1.    println("对电影评分数据以Timestamp和Rating两个维度进行二次降序排列:")
2.  val pairWithSortkey = ratingsRDD.map(line => {
3.        val splited = line.split("::")
4.        (new SecondarySortKey(splited(3).toDouble, splited(2).toDouble), line)
5.      })
6.  //直接调用sortByKey,此时会按照之前实现的compare方法排序
7.  val sorted = pairWithSortkey.sortByKey(false)
8.
9.  val sortedResult = sorted.map(sortedline => sortedline._2)
10. sortedResult.take(10).foreach(println)

取出排序后的RDD的value,此时这些记录已经是按照时间戳和评分排好序的,最终打印出的结果如图1-5所示,从图中可以看到已经按照timestamp和评分降序排列了。

图1-5 电影系统二次排序运行结果