还剩11页未读,继续阅读
本资源只提供10页预览,全部文档请下载后查看!喜欢就下载吧,查找使用更方便
文本内容:
Spark大数据分析与西安电子科技大课程名称Spark大数据分析选用教材出版社学出版社实战(第2版)借助成熟的Spark RDD项目分析交通3Spark RDD教学内容授课学时早p技术,分析交通违章记录违章记录文件中的数据授课班级授课日期授课地点****专*****班
(1)了解RDD的特性及运算的原理,了解RDD的执行流程;
(2)熟悉各种数据源创建RDD的算子,多种方法查看RDD的元素教学目标
(2)熟练使用算子完成RDD的转换、排序、过滤、去重等操作;
(3)能够完成键值对RDD的生成、转换等操作;
(4)根据业务需求,能将RDD中数据输出到文件系统中
(1)RDD的生成(内存数据、文件等生成)
(2)RDD的map、filter sortBy等常用算子;重点难点
(3)键值对RDD的key、value相关操作,键值对RDD排序等;
(4)两个RDD的相关操作:join、union、zip等团讲授口讨论或座谈口问题导向学习口分组合作学习口案例教学口任务驱动教学方法团项目教学口情景教学口演示汇报口实践教学□参观访问口引导文教学口其他(--)
(1)教材《Spark大数据分析与实战(第2版)》教学准备(教
(2)硬件设备内存8G(或以上)的计算机师)
(2)教学资源课件PPT、教学日历、相关软件等
(1)教材《Spark大数据分析与实战(第2版)》教学准备(学
(2)硬件设备内存8G(或以上)的计算机生)
(3)教学资源课件PPT、相关软件等教学内容与过程教学环节(教学内容、教学方法、组织形式、教学手段)教师通过课程教学平台或班级群发布学习预习任务及课程资源;学生提前预习相关内容,并完成课前组织课前自测等scala valrdd4=rddl.rightOuterJoinrdd2//两个RDD左连接rdd4:org.apache,spark,rdd.RDD[String,Option[Int],Int]二MapPartitionsRDD
[18]atrightOuterJoin at console:26scala rdd
4.collect res8:Array[String,Option[Int],Int]=Arraytom,Some1,5,apple,None,2,jerry,Some2,6任务
3.6将违章处理的结果存储到外部文件
1.CSV文件的读写CSV commaseparated values逗号分隔值、TSV tabseparated values制表符分割值文件是两种常见的文件格式,其读取方式与普通文本文件基本一致csv文件的每行数据以英文“,”分割,可以使用记事本、Excel等打开scala valcsvRDD=sc.textFilecsvPath〃读取csv文件,生成RDD;文件的每一行,成为RDD的一个元素csvRDD:org.apache,spark,rdd.RDD[String]=file:///home/hadoop/data/author,csv MapPartitionsRDD
[1]at textFileat console:
262.读写Sequence文件Sequence格式较为特殊,只有键值对形式的数据才可以保存为SequenceFile格式SequenceFile可以对数据进行压缩可以逐条压缩,也可以压缩整个数据块,默认情况下不启用压缩下面的代码中,rdd保存了键值对数据,可以使用saveAsSequenceFile将其中数据保存为SequenceFile文件
3.Spark RDD的执行流程
4.RDD的依赖关系RDD的每次转换操作都会产生一个新的RDD,那么前后RDD之间便形成了一定的依赖关系;RDD中的依赖关系分为窄依赖Narrow Dependency与觉依赖Wide Dependency,图3T7展示了两种依赖之间的区别ide Dependencies:join withinputs notco-partitioned1\arrow Dependencies:Spark RDD可以由内存数据生成,也可以读取文本文件、JSON文件、CSV文件或者HBASE数据库等生成Spark RDD的操作报告转换操作和行动操作两大类,其中转换操作主要有一个RDD总结评价生成一个新的RDD包括map、flatMap filterjoin等,而行动操作则是向驱动器程序返回结果或把结果写入外部系统的操作,会触发实际map,filter groupByKey的计算报告count、first、collect等;通过组合使用RDD算子,可以完成大数据分析的工作join withinputsco-partitionedunion任务
3.1根据交通违章数据创建RDDL认识RDDRDD就是一个分布在集群多节点中存放数据的集合;虽然一个数据集分散于集群多个节点,但逻辑上仍然是一个整体(即RDD),数据处理人员只需对这个整体进行处理,而无需关注底层逻辑与实现方法,从而极大降低了大数据编程的难度其计算流程如下
2.内存数据创建RDD针对程序中已有的数据集合List、Array Tuple等,Spark提供了两个方法课程内容描parallelize和makeRDD,它们均可复制数据集合的元素后,创建一个可并行计算的分布式数据集述RDDoparallelize方式适用于做简单的Spark程序测试、Spark学习;下面演示根据列表数据创建RDD scala val nums=List1,2,3,4,5//包含5个整数的列表nums:List[Int]=List1,2,3,4,5scala valnumsRDD=sc.parallelizenums//根据列表nums,创建一个RDDnumsRDD numsRDD:org.apache,spark,rdd.RDD[Int]=ParallelCollectionRDD
[1]at parallelizeatconsole:26scala valcars=Array〃比亚迪〃,〃长安〃,〃奇瑞〃,〃广汽〃cars:Array[String]=Array比亚迪,长安,奇瑞,广汽scala valcarsRDD=sc.parallelize cars//根据数组cars,创建一个RDD carsRDDcarsRDD:org.apache.spark,rdd.RDD[String]=ParallelCollectionRDD
[2]at parallelizeatconsole:
263.外部文件创建RDD由文件创建RDD,采用sc.textFile“文件路径”方式,路径前面需要加入“file://”以表示本地文件Spark-shell环境下,要求所有节点的相同位置均保存该文件现有本地文件u/home/hadoop/data/guide.txtn,借助textFile方法,可以生成RDD,演示代码如下scala valf ileRDD=sc.textFile z,file:///home/hadoop/data/guide.txt〃〃注意路径的写法fileRDD:org.apache,spark,rdd.RDD[String]=file:///home/hadoop/data/guide.txtMapPartitionsRDD
[11]at textFileat console:25scala fileRDD.count〃使用count方法查看RDD的元素数量,即guide.txt文件的行数resl4:Long=44务
3.2找出扣分最高的交通违法条目
1.查看RDD的元素在学习或测试代码时,为了便于掌控计算过程、及时发现问题,可以使用collect操作查看RDD内元素的值;collect操作会将RDD的所有元素组成一个数组并返回给Driver端;其用法示例如上_____________________________________________________________________________________scala valnums=List1,2,3,4,5nums:List[Int]=List1,2,3,4,5scala valnumsRDD=sc.parallelizenums//根据列表nums,创建RDDnumsRDD:org.apache,spark,rdd.RDD[Int]=ParallelCollectionRDD
[4]at parallelizeat console:26scala numsRDD.collect〃查看RDD的元素值res5:Array[Int]=Array1,2,3,4,
52.Map操作map操作是最常用的转换操作,该操作接收一个函数作为参数,进而将RDD中的每个元素作为参数传入某个函数,函数处理完后的返回值组成一个新的RDD;其目的是根据现有的RDD,经过函数处理,最终得到一个新的RDD用法示例如下:__________________________________________________scala valdata=List1,2,3,4,5,6data:List[Int]=List1,2,3,4,5,6scala valdataRDD=sc.parallelizedatadataRDD:org.apache,spark,rdd.RDD[Int]=ParallelCollectionRDD
[10]at parallelizeat console:26scala valnewDataRDD=dataRDD.mapx=x*2newDataRDD:org.apache,spark.rdd.RDD[Int]=MapPartitionsRDD
[11]at mapat console:25scala newDataRDD.collectreslO:Array[Int]=Array2,4,6,8,10,12scala valpeoples=List tom,jerry”,〃petter〃,〃ken〃peoples:List[String]=Listtom,jerry,petter,kenscala valpeoplesRDD=sc.makeRDDpeoplespeoplesRDD:org.apache,spark,rdd.RDD[String]=Parallel CollectionRDD
[7]at makeRDD atconsole:26scala valnewPeoplesRDD=peoplesRDD.mapx=x.toUpperCase newPeoplesRDD:org.apache.spark.rdd.RDD[String]=MapPartitionsRDD
[8]at mapat console:25scala newPeoplesRDD.collectres8:Array[String]=ArrayTOM,JERRY,PETTER,KEN
3.RDD的排序sortBy操作可以对RDD元素进行排序,并返回排好序的新RDD;sortBy有3个参数,其用法说明如下;________________________________________________________________________________def sortBy[K]f:T=K,ascending:Boolean二true,numPartitions:Int=this,partitions,length:RDD[T]Return thisRDD sortedbythe givenkey function参数1fT=K,左边为要排序的RDD的每一个元素,右边返回要进行排序的值♦参数2ascending可选项,升序或降序排列标识,默认为true、升序排列,若要降序♦排列则需写false参数3numPartitions可选项,排序后新RDD的分区数量,默认分区数量与原RDD相同♦针对某个RDD,将RDD的元素数据交给“f:T=K”函数进行处理;而后按照函数运算后的返回值进行排序,默认为升序排列
4.数值型RDD的统计对于数值元素组成的RDD,Spark提供了max、min、sum等若干统计算子,可以完成简单的统计分析;相关示例如下:________________________________________________________________scala valdata=sc.makeRDDList8,10,7,4,1,9,6,3,5,2data:org.apache,spark.rdd.RDD[Int]=Parallel CollectionRDD
[0]at makeRDD at console:25scala data,max//返回RDD中的最大值res9:Int=10scala data.min//返回RDD中的最小值reslO:Int=1任务
3.3查找某车辆的违章记录
1.filter操作filter是一个转换操作,可用于筛选出满足特定条件元素,返回一个新的RDD;其用法说明如E__________________________________________________________________________________def filterf:T=Boolean:RDD[T]Return anew RDDcontaining onlythe elementsthat satisfya predicate.其应用示例如下:scala valnumsRDD=sc.makeRDD List3,1,2,9,10,5,8,4,7,6numsRDD:org.apache,spark,rdd.RDDLintJ=ParallelCollectionRDD[27J at makeRDDat console:25scala valrdd1二numsRDD.filterx=x%2==0//过滤出偶数元素,组成一个新RDD并返回rddl:org.apache,spark.rdd.RDD[Int]=MapPartitionsRDD
[28]at filterat console:25scala rddl.collect res5:Array[Int]=Array2,10,8,4,6scala valtextsRDD=sc.makeRDDList/ZI like Spark”,〃He likeHadoop,She likeSparktextsRDD:org.apache,spark,rdd.RDD[String]=ParallelCollectionRDD
[29]at meikeRDDat console:26scala valrdd2=textsRDD.filterx=x.contains〃Spark〃〃过滤出含有字符串“Spark”的元素rdd2:org.apache,spark.rdd.RDD[String]=MapPartitionsRDD
[30]at filterat console:25scala rdd
2.collect res6:Array[String]二Array IlikeSpark,She likeSpark
2.distinct操作RDD的元素可能存在重复情况,当我们需要去掉重复元素时,可以使用distinct方法scala valdataRDD=sc.makeRDDList3,5,7,3,4,8,5//dataRDD内有重复元素
3、5dataRDD:org.apache,spark,rdd.RDD[Int]=ParallelCollectionRDD[Oj at makeRDDat console:25scala valnewDataRDD=dataRDD.distinct〃去除重复元素newDataRDD:org.apache,spark.rdd.RDD[Int]=MapPartitionsRDD
[3]at distinctat console:25scala newDataRDD.collect〃检查是否成功去重res2:Array[Int]=Array4,8,5,3,
73.union等操作1union方法可将两个RDD的元素合并为一个新RDD,即得到两个RDD的并集2intersect ion可以求两个RDD的交集,即两个RDD的相同元素3类似于数学中集合的差集运算,可以使用subtract来求两个RDD的差集4cartesian用于求两个RDD的笛卡尔积,将两个集合元素组合成一个新的RDD任务
3.4查找违章次数3次以上车辆
1.键值对RDD键值对RDD PairRDD是指每个RDD元素都是Key,Value键值类型即二元组;普通RDD里面存储的数据类型是Int、String等,而“键值对RDD”里面存储的数据类型是“键值对下面代码中,我们首先定义一个列表scores,scores的每个元素为二元组,记录学生的姓名及考试成绩;接下来,使用parallelize方法生成键值对RDD scoresRDD,scoresRDD元素的类型为二元组scala valscores=List〃张小帅,84,〃孙田〃,80,〃马莉〃,92“scores的元素为二元组,例如〃张小帅〃,84scores:List[String,Int]=List张小帅,84,孙田,80,马莉,92scala valscoresRDD=sc.parallelize scores〃scoresRDD即为键值对RDDscoresRDD:org.apache,spark,rdd.RDD[String,Int]=ParallelCollectionRDD
[0]at parallelizeat console:26scala scoresRDD.collectO〃scoresRDD的元素为二元组res2:Array[String,Int]二Array张小帅,84,孙田,80,马莉,
922.Lookup查找value键值对RDD的元素为key,value形式的二元组,keys操作可以获取键值对RDD中所有的key,组成一个新的RDD并返回;values操作会把键值对RDD中的所有value返回,形成一个新的RDD;两个操作的用法示例如下:________________________________________________________________scala valdata=ListSpark”,1,〃Hadoop〃,2,〃Flink〃,3,〃kafka〃,4data:List[String,Int]=ListSpark,1,Hadoop,2,Flink,3,kafka,4scala valpairRDD=sc.makeRDDdatapairRDD:org.apache,spark.rdd.RDD[String,Int]=ParallelCollectionRDD
[9]at makeRDDat console:26scala valkeysRDD=pairRDD.keys〃获取所有的key,组成新RDDkeysRDD:org.apache.spark.rdd.RDD[String]二MapPartitionsRDD
[10]at keysat console:25scala keysRDD.collectres6:Array[String]=Array Spark,Hadoop,Fl ink,kafkascala valvaluesRDD=pairRDD.values〃获取所有的value,组成新的RDDvaluesRDD:org.apache.spark,rdd.RDD[Int]=MapPartitionsRDD
[12]at valuesat console:25scala valuesRDD.collectOres7:Array[Int]=Array1,2,3,
43.ByKey相关操作对于键值对RDD,Spark提供了groupByKey sortByKeyreduceByKey等若干ByKey相关操作;其中,groupByKey是根据key值,对value进行分组;用法演示如下fruits:List[String,Double]=List apple,
5.5,orange,
3.0,apple,
8.2,banana,
2.7,orange,
4.2scala valfruitsRDD=sc.makeRDDfruitsfruitsRDD:org.apache,spark.rdd.RDD[String,Double]=ParallelCollectionRDD
[23]at makeRDDatconsole:26scala valgroupedRDD=fruitsRDD.groupByKey//按照Key,对value进行分组groupedRDD:org.apache,spark.rdd.RDD[String,Iterable[Double]]=ShuffledRDD
[25]at groupByKeyatconsole:25scala groupedRDD.collect res22:Array[String,Iterable[Double]]=Arraybanana,CompactBuffer
2.7,orange,CompactBuffer
3.0,
4.2,apple,CompactBuffer⑸5,
8.
24.mapValue操作实际业务中,可能遇到只对键值对RDD的value部分进行处理,而保持value不变的需求;这时,可以使用mapValues func,它的功能是将RDD元组中的value交给函数func处理任务
3.5查找累计扣12分以上车辆信息
1.zip操作将两个RDD组合成键值对RDD除了使用makeRDD等方式创建键值对RDD,还可以使用zip操作亦称为“拉链操作”将两个元素数量相同、分区数相同的普通RDD组合成一个键值对RDD下面代码中,rddl由3个元素分区数量默认,rdd2也有3个元素;代码rddl.zip rdd2将前述两个RDD组合成一个键值对新的RDD_____________________________________________________________________________scala valrddl二sc.makcRDDList〃东岳〃,〃西岳”,〃南岳〃,〃北岳〃,〃中岳〃rddl:org.apache,spark,rdd.RDD[String]=ParallelCollcctionRDD
[1]atmakeRDDatconsole:25scala valrdd2=sc.makeRDDList〃泰山〃,〃华山,〃衡山,〃恒山山〃嵩山”rdd2:org.apache,spark.rdd.RDD[String]=ParallelCollectionRDD
[2]atmakeRDDatconsole:25scala rddl.ziprdd
2.collect〃rddl、rdd2组成一个键值对RDD,并输出其元素res3:Array[String,String]二Array东岳,泰山,西岳,华山,南岳,衡山,北岳,恒山,中岳,嵩山2join连接两个RDDjoin概念来自于关系数据库领域,Spark RDD中的join的类型也包括内连接join、左外连接leftOuteiJoin、右外连接rightOuterJoin等其中,join是对于给定的两个键值对RDD数据类型为K,V1和K,V2,只有两个RDD中都存在的Key才会被输出,最终得到一个K,V1,V2类型的RDD;其用法示例如下:______________________________________________scala rddl.joinrdd
2.collectres5:Array[String,Int,Int]=Array tom,1,5,jerry,2,6scalavalrdd3=rddl.joinrdd2//rddK rdd2中有相同的Keytom、jerryrdd3:org.apache,spark,rdd.RDD[String,Int,Int]=MapPartitionsRDD
[13]at joinatconsole:263其他连接rightOuterJoin类似于SQL中的右外关联right outerjoin,根据两个RDD的Key进行右连接,返回结果以右边第二个的RDD为主,关联不上的记录为空None值leftOuteiJoin类似于SQL中的左外关联left outerjoin,可以根据两个RDD的Key进行左连接,返回结果以左边第一个的RDD为主,关联不上的记录为空None值;其用法示例如下。
个人认证
优秀文档
获得点赞 0