1.2 通过DataFrame和DataSet实战电影点评系统

DataFrameAPI是从Spark 1.3开始就有的,它是一种以RDD为基础的分布式无类型数据集,它的出现大幅度降低了普通Spark用户的学习门槛。

DataFrame类似于传统数据库中的二维表格。DataFrame与RDD的主要区别在于,前者带有schema元信息,即DataFrame表示的二维表数据集的每一列都带有名称和类型。这使得Spark SQL得以解析到具体数据的结构信息,从而对DataFrame中的数据源以及对DataFrame的操作进行了非常有效的优化,从而大幅提升了运行效率。

DataSetAPI是从1.6版本提出的,在Spark 2.2的时候,DataSet和DataFrame趋于稳定,可以投入生产环境使用。与DataFrame不同的是,DataSet是强类型的,而DataFrame实际上就是DataSet[Row](也就是Java中的DataSet<Row>)。

DataSet是Lazy级别的,Transformation级别的算子作用于DataSet会得到一个新的DataSet。当Action算子被调用时,Spark的查询优化器会优化Transformation算子形成的逻辑计划,并生成一个物理计划,该物理计划可以通过并行和分布式的方式来执行。

反观RDD,由于无从得知其中数据元素的具体内部结构,故很难被Spark本身自行优化,对于新手用户很不友好,但是,DataSet底层是基于RDD的,所以最终的优化尽头还是对RDD的优化,这就意味着优化引擎不能自动优化的地方,用户在RDD上可能有机会进行手动优化。

1.2.1 通过DataFrame实战电影点评系统案例

现在我们通过实现几个功能来了解DataFrame的具体用法。先来看第一个功能:通过DataFrame实现某部电影观看者中男性和女性不同年龄分别有多少人。

1.   println("功能一:通过DataFrame实现某部电影观看者中男性和女性不同年龄人数")
2.  //首先把Users的数据格式化,即在RDD的基础上增加数据的元数据信息
3.  val schemaForUsers = StructType(
4.  "UserID::Gender::Age::OccupationID::Zip-code".split("::")
5.  .map(column => StructField(column, StringType, true)))
6.  //然后把我们的每一条数据变成以Row为单位的数据
7.  val usersRDDRows = usersRDD
8.  .map(_.split("::"))
9.  .map(line =>
10. Row(line(0).trim,line(1).trim,line(2).trim,line(3).trim,line(4).trim))
11. //使用SparkSession的createDataFrame方法,结合Row和StructType的元数据信息
    //基于RDD创建DataFrame,这时RDD就有了元数据信息的描述
12. val usersDataFrame = spark.createDataFrame(usersRDDRows, schemaForUsers)
13. //也可以对StructType调用add方法来对不同的StructField赋予不同的类型
14. val schemaforratings = StructType("UserID::MovieID".split("::").
15. map(column => StructField(column, StringType, true)))
16. .add("Rating", DoubleType, true)
17. .add("Timestamp",StringType, true)
18.
19. val ratingsRDDRows = ratingsRDD
20. .map(_.split("::"))
21. .map(line =>
22.  Row(line(0).trim,line(1). trim,line(2).trim.toDouble,line(3).trim))
23. val ratingsDataFrame = spark.createDataFrame(ratingsRDDRows,
    schemaforratings)
24. //接着构建movies的DataFrame
25. val schemaformovies = StructType("MovieID::Title::Genres".split("::")
26. .map(column => StructField(column, StringType, true)))
27. val moviesRDDRows = moviesRDD
28. .map(_.split("::"))
29. .map(line => Row(line(0).trim,line(1).trim,line(2).trim))
30. val moviesDataFrame = spark.createDataFrame(moviesRDDRows,
    schemaformovies)
31.
32. //这里能够直接通过列名MovieID为1193过滤出这部电影,这些列名都是在上面指定的
33. ratingsDataFrame.filter(s" MovieID = 1193")
34. //Join的时候直接指定基于UserID进行Join,这相对于原生的RDD操作而言更加方便快捷
35. .join(usersDataFrame, "UserID")
36. //直接通过元数据信息中的Gender和Age进行数据的筛选
37. .select("Gender", "Age")
38. //直接通过元数据信息中的Gender和Age进行数据的groupBy操作
39. .groupBy("Gender", "Age")
40. //基于groupBy分组信息进行count统计操作,并显示出分组统计后的前10条信息
41. .count().show(10)

最终打印结果如图1-6所示,类似一张普通的数据库表。

图1-6 电影观看者中男性和女性人数

上面案例中的代码无论是从思路上,还是从结构上都和SQL语句十分类似,下面通过写SQL语句的方式来实现上面的案例。

1.   println("功能二:用LocalTempView实现某部电影观看者中不同性别不同年龄分别有多少
    人?")
2.  //既然使用SQL语句,那么表肯定是要有的,所以需要先把DataFrame注册为临时表
3.  ratingsDataFrame.createTempView("ratings")
4.  usersDataFrame.createTempView("users")
5.  //然后写SQL语句,直接使用SparkSession的sql方法执行SQL语句即可
6.  val sql_local = "SELECT Gender, Age, count(*) from  users u join
7.  ratings as r on u.UserID = r.UserID where MovieID = 1193 group by Gender,
    Age"
8.  spark.sql(sql_local).show(10)

这样我们就可以得到与上面案例相同的结果,这对写SQL比较多的用户是十分友好的。但是有一个问题需要注意,这里调用createTempView创建的临时表是会话级别的,会话结束时这个表也会消失。那么,怎么创建一个Application级别的临时表呢?可以使用createGlobalTempView来创建临时表,但是这样就要在写SQL语句时在表名前面加上global_temp,例如:

1.   ratingsDataFrame.createGlobalTempView("ratings")
2.  usersDataFrame.createGlobalTempView("users")
3.
4.  val sql = "SELECT Gender, Age, count(*) from  global_temp.users u join
    global_temp.ratings as r on u.UserID = r.UserID where MovieID = 1193 group
    by Gender, Age"
5.  spark.sql(sql).show(10)

第一个DataFrame案例实现了简单的类似SQL语句的功能,但这是远远不够的,我们要引入一个隐式转换来实现复杂的功能:

1.   import spark.sqlContext.implicits._
2.  ratingsDataFrame.select("MovieID", "Rating")
3.  .groupBy("MovieID").avg("Rating")
4.  //接着我们可以使用“$”符号把引号里的字符串转换成列来实现相对复杂的功能,例如,下面
    //我们把avg(Rating)作为排序的字段降序排列
5.  .orderBy($"avg(Rating)".desc).show(10)

从图1-7的结果可以看到,求平均值的那一列列名和在SQL语句里使用函数时的列名一样变成了avg(Rating),程序中的orderBy里传入的列名要和这个列名一致,否则会报错,提示找不到列。

图1-7 电影系统SQL运行结果

有时我们也可能会在使用DataFrame的时候在中间某一步转换到RDD里操作,以便实现更加复杂的逻辑。下面来看一下DataFrame和RDD的混合编程。

1.  ratingsDataFrame.select("MovieID", "Rating")
2.  .groupBy("MovieID").avg("Rating")
3.  //这里直接使用DataFrame的rdd方法转到RDD里操作
4.  .rdd.map(row =>(row(1),(row(0), row(1))))
5.  .sortBy(_._1.toString.toDouble, false)
6.  .map(tuple => tuple._2)
7.  .collect.take(10).foreach(println)

1.2.2 通过DataSet实战电影点评系统案例

前面提到的DataFrame其实就是DataSet[Row],所以只要学会了DataFrame的使用,就可以快速接入DataSet,只不过在创建DataSet的时候要注意与创建DataFrame的方式略有不同。DataSet可以由DataFrame转换而来,只需要用yourDataFrame.as[yourClass]即可得到封装了yourClass类型的DataSet,之后就可以像操作DataFrame一样操作DataSet了。接下来我们讲一下如何直接创建DataSet,因为DataSet是强类型的,封装的是具体的类(DataFrame其实封装了Row类型),而类本身可以视作带有Schema的,所以只需要把数据封装进具体的类,然后直接创建DataSet即可。

首先引入一个隐式转换,并创建几个caseClass用来封装数据。

1.   import spark.implicits._
2.  case class User(UserID:String, Gender:String, Age:String, OccupationID:
    String, Zip_Code:String)
3.  case    class     Rating(UserID:String,        MovieID:String,      Rating:Double,
    Timestamp:String)
4.  然后把数据封装进这些Class:
5.  val usersForDSRDD = usersRDD.map(_.split("::")).map(line =>
6.       User(line(0).trim,line(1).trim,line(2).trim,line(3).trim,line(4).trim))
7.       最后直接创建DataSet:
8.  val usersDataSet = spark.createDataset[User](usersForDSRDD)
9.  usersDataSet.show(10)
10.

电影系统运行结果如图1-8所示,列名为User类的属性名。下面使用同样的方法创建ratingsDataSet并实现一个案例:找出观看某部电影的不同性别不同年龄的人数。

1.   val ratingsForDSRDD = ratingsRDD.map(_.split("::")).map(line =>
2.  Rating(line(0).trim,line(1).trim,line(2).trim.toDouble,line(3).trim))
3.  val ratingsDataSet = spark.createDataset(ratingsForDSRDD)
4.  //下面的实现代码和使用DataFrame方法几乎完全一样(把DataFrame换成DataSet即可)
5.  ratingsDataSet.filter(s" MovieID = 1193").join(usersDataSet, "UserID")
6.        .select("Gender", "Age").groupBy("Gender", "Age").count()
7.       .orderBy($"Gender".desc,$"Age").show()

观看电影性别、年龄统计结果如图1-9所示。

图1-8 电影系统运行结果

图1-9 观看电影性别、年龄统计结果

当然,也可以把DataFrame和DataSet混着用(这样做会导致代码混乱,故不建议这样做),得到的结果完全一样。

最后根据源码,有几点需要补充:

RDD的cache方法等于MEMORY_ONLY级别的persist,而DataSet的cache方法等于MEMORY_AND_DISK级别的persist,因为重新计算的代价非常昂贵。如果想使用其他级别的缓存,可以使用persist并传入相应的级别。

RDD.scala源码:

1.  /**
2.    *使用默认的存储级别持久化RDD (`MEMORY_ONLY`).
3.    */
4.   def cache(): this.type = persist()

Dataset.scala源码:

1.  /**
2.    *使用默认的存储级别持久化DataSet (`MEMORY_AND_DISK`).
3.    *
4.    * @group basic
5.    * @since 1.6.0
6.    */
7.   def cache(): this.type = persist()

基于DataSet的计算会像SQL一样被Catalyst引擎解析生成执行查询计划,然后执行。我们可以使用explain方法来查看执行计划。