- Spark大数据商业实战三部曲:内核解密|商业案例|性能调优
- 王家林
- 2323字
- 2021-03-30 21:55:46
1.2 通过DataFrame和DataSet实战电影点评系统
DataFrameAPI是从Spark 1.3开始就有的,它是一种以RDD为基础的分布式无类型数据集,它的出现大幅度降低了普通Spark用户的学习门槛。
DataFrame类似于传统数据库中的二维表格。DataFrame与RDD的主要区别在于,前者带有schema元信息,即DataFrame表示的二维表数据集的每一列都带有名称和类型。这使得Spark SQL得以解析到具体数据的结构信息,从而对DataFrame中的数据源以及对DataFrame的操作进行了非常有效的优化,从而大幅提升了运行效率。
DataSetAPI是从1.6版本提出的,在Spark 2.2的时候,DataSet和DataFrame趋于稳定,可以投入生产环境使用。与DataFrame不同的是,DataSet是强类型的,而DataFrame实际上就是DataSet[Row](也就是Java中的DataSet<Row>)。
DataSet是Lazy级别的,Transformation级别的算子作用于DataSet会得到一个新的DataSet。当Action算子被调用时,Spark的查询优化器会优化Transformation算子形成的逻辑计划,并生成一个物理计划,该物理计划可以通过并行和分布式的方式来执行。
反观RDD,由于无从得知其中数据元素的具体内部结构,故很难被Spark本身自行优化,对于新手用户很不友好,但是,DataSet底层是基于RDD的,所以最终的优化尽头还是对RDD的优化,这就意味着优化引擎不能自动优化的地方,用户在RDD上可能有机会进行手动优化。
1.2.1 通过DataFrame实战电影点评系统案例
现在我们通过实现几个功能来了解DataFrame的具体用法。先来看第一个功能:通过DataFrame实现某部电影观看者中男性和女性不同年龄分别有多少人。
1. println("功能一:通过DataFrame实现某部电影观看者中男性和女性不同年龄人数") 2. //首先把Users的数据格式化,即在RDD的基础上增加数据的元数据信息 3. val schemaForUsers = StructType( 4. "UserID::Gender::Age::OccupationID::Zip-code".split("::") 5. .map(column => StructField(column, StringType, true))) 6. //然后把我们的每一条数据变成以Row为单位的数据 7. val usersRDDRows = usersRDD 8. .map(_.split("::")) 9. .map(line => 10. Row(line(0).trim,line(1).trim,line(2).trim,line(3).trim,line(4).trim)) 11. //使用SparkSession的createDataFrame方法,结合Row和StructType的元数据信息 //基于RDD创建DataFrame,这时RDD就有了元数据信息的描述 12. val usersDataFrame = spark.createDataFrame(usersRDDRows, schemaForUsers) 13. //也可以对StructType调用add方法来对不同的StructField赋予不同的类型 14. val schemaforratings = StructType("UserID::MovieID".split("::"). 15. map(column => StructField(column, StringType, true))) 16. .add("Rating", DoubleType, true) 17. .add("Timestamp",StringType, true) 18. 19. val ratingsRDDRows = ratingsRDD 20. .map(_.split("::")) 21. .map(line => 22. Row(line(0).trim,line(1). trim,line(2).trim.toDouble,line(3).trim)) 23. val ratingsDataFrame = spark.createDataFrame(ratingsRDDRows, schemaforratings) 24. //接着构建movies的DataFrame 25. val schemaformovies = StructType("MovieID::Title::Genres".split("::") 26. .map(column => StructField(column, StringType, true))) 27. val moviesRDDRows = moviesRDD 28. .map(_.split("::")) 29. .map(line => Row(line(0).trim,line(1).trim,line(2).trim)) 30. val moviesDataFrame = spark.createDataFrame(moviesRDDRows, schemaformovies) 31. 32. //这里能够直接通过列名MovieID为1193过滤出这部电影,这些列名都是在上面指定的 33. ratingsDataFrame.filter(s" MovieID = 1193") 34. //Join的时候直接指定基于UserID进行Join,这相对于原生的RDD操作而言更加方便快捷 35. .join(usersDataFrame, "UserID") 36. //直接通过元数据信息中的Gender和Age进行数据的筛选 37. .select("Gender", "Age") 38. //直接通过元数据信息中的Gender和Age进行数据的groupBy操作 39. .groupBy("Gender", "Age") 40. //基于groupBy分组信息进行count统计操作,并显示出分组统计后的前10条信息 41. .count().show(10)
最终打印结果如图1-6所示,类似一张普通的数据库表。
图1-6 电影观看者中男性和女性人数
上面案例中的代码无论是从思路上,还是从结构上都和SQL语句十分类似,下面通过写SQL语句的方式来实现上面的案例。
1. println("功能二:用LocalTempView实现某部电影观看者中不同性别不同年龄分别有多少 人?") 2. //既然使用SQL语句,那么表肯定是要有的,所以需要先把DataFrame注册为临时表 3. ratingsDataFrame.createTempView("ratings") 4. usersDataFrame.createTempView("users") 5. //然后写SQL语句,直接使用SparkSession的sql方法执行SQL语句即可 6. val sql_local = "SELECT Gender, Age, count(*) from users u join 7. ratings as r on u.UserID = r.UserID where MovieID = 1193 group by Gender, Age" 8. spark.sql(sql_local).show(10)
这样我们就可以得到与上面案例相同的结果,这对写SQL比较多的用户是十分友好的。但是有一个问题需要注意,这里调用createTempView创建的临时表是会话级别的,会话结束时这个表也会消失。那么,怎么创建一个Application级别的临时表呢?可以使用createGlobalTempView来创建临时表,但是这样就要在写SQL语句时在表名前面加上global_temp,例如:
1. ratingsDataFrame.createGlobalTempView("ratings") 2. usersDataFrame.createGlobalTempView("users") 3. 4. val sql = "SELECT Gender, Age, count(*) from global_temp.users u join global_temp.ratings as r on u.UserID = r.UserID where MovieID = 1193 group by Gender, Age" 5. spark.sql(sql).show(10)
第一个DataFrame案例实现了简单的类似SQL语句的功能,但这是远远不够的,我们要引入一个隐式转换来实现复杂的功能:
1. import spark.sqlContext.implicits._ 2. ratingsDataFrame.select("MovieID", "Rating") 3. .groupBy("MovieID").avg("Rating") 4. //接着我们可以使用“$”符号把引号里的字符串转换成列来实现相对复杂的功能,例如,下面 //我们把avg(Rating)作为排序的字段降序排列 5. .orderBy($"avg(Rating)".desc).show(10)
从图1-7的结果可以看到,求平均值的那一列列名和在SQL语句里使用函数时的列名一样变成了avg(Rating),程序中的orderBy里传入的列名要和这个列名一致,否则会报错,提示找不到列。
图1-7 电影系统SQL运行结果
有时我们也可能会在使用DataFrame的时候在中间某一步转换到RDD里操作,以便实现更加复杂的逻辑。下面来看一下DataFrame和RDD的混合编程。
1. ratingsDataFrame.select("MovieID", "Rating") 2. .groupBy("MovieID").avg("Rating") 3. //这里直接使用DataFrame的rdd方法转到RDD里操作 4. .rdd.map(row =>(row(1),(row(0), row(1)))) 5. .sortBy(_._1.toString.toDouble, false) 6. .map(tuple => tuple._2) 7. .collect.take(10).foreach(println)
1.2.2 通过DataSet实战电影点评系统案例
前面提到的DataFrame其实就是DataSet[Row],所以只要学会了DataFrame的使用,就可以快速接入DataSet,只不过在创建DataSet的时候要注意与创建DataFrame的方式略有不同。DataSet可以由DataFrame转换而来,只需要用yourDataFrame.as[yourClass]即可得到封装了yourClass类型的DataSet,之后就可以像操作DataFrame一样操作DataSet了。接下来我们讲一下如何直接创建DataSet,因为DataSet是强类型的,封装的是具体的类(DataFrame其实封装了Row类型),而类本身可以视作带有Schema的,所以只需要把数据封装进具体的类,然后直接创建DataSet即可。
首先引入一个隐式转换,并创建几个caseClass用来封装数据。
1. import spark.implicits._ 2. case class User(UserID:String, Gender:String, Age:String, OccupationID: String, Zip_Code:String) 3. case class Rating(UserID:String, MovieID:String, Rating:Double, Timestamp:String) 4. 然后把数据封装进这些Class: 5. val usersForDSRDD = usersRDD.map(_.split("::")).map(line => 6. User(line(0).trim,line(1).trim,line(2).trim,line(3).trim,line(4).trim)) 7. 最后直接创建DataSet: 8. val usersDataSet = spark.createDataset[User](usersForDSRDD) 9. usersDataSet.show(10) 10.
电影系统运行结果如图1-8所示,列名为User类的属性名。下面使用同样的方法创建ratingsDataSet并实现一个案例:找出观看某部电影的不同性别不同年龄的人数。
1. val ratingsForDSRDD = ratingsRDD.map(_.split("::")).map(line => 2. Rating(line(0).trim,line(1).trim,line(2).trim.toDouble,line(3).trim)) 3. val ratingsDataSet = spark.createDataset(ratingsForDSRDD) 4. //下面的实现代码和使用DataFrame方法几乎完全一样(把DataFrame换成DataSet即可) 5. ratingsDataSet.filter(s" MovieID = 1193").join(usersDataSet, "UserID") 6. .select("Gender", "Age").groupBy("Gender", "Age").count() 7. .orderBy($"Gender".desc,$"Age").show()
观看电影性别、年龄统计结果如图1-9所示。
图1-8 电影系统运行结果
图1-9 观看电影性别、年龄统计结果
当然,也可以把DataFrame和DataSet混着用(这样做会导致代码混乱,故不建议这样做),得到的结果完全一样。
最后根据源码,有几点需要补充:
RDD的cache方法等于MEMORY_ONLY级别的persist,而DataSet的cache方法等于MEMORY_AND_DISK级别的persist,因为重新计算的代价非常昂贵。如果想使用其他级别的缓存,可以使用persist并传入相应的级别。
RDD.scala源码:
1. /** 2. *使用默认的存储级别持久化RDD (`MEMORY_ONLY`). 3. */ 4. def cache(): this.type = persist()
Dataset.scala源码:
1. /** 2. *使用默认的存储级别持久化DataSet (`MEMORY_AND_DISK`). 3. * 4. * @group basic 5. * @since 1.6.0 6. */ 7. def cache(): this.type = persist()
基于DataSet的计算会像SQL一样被Catalyst引擎解析生成执行查询计划,然后执行。我们可以使用explain方法来查看执行计划。