评分最高的电影
package com.bj.scalacode
import org.apache.spark.SparkConf
import org.apache.spark.sql.Sparksession
/**
* 平均评分最高的10部电影
*/
object RDD_Movie_Users_Analyzer2 {
def main(args: Array[String]): Unit = {
var dataPath="F:\\baidu\\ml-1m"
//配置SparkConf,这里指本地运行,并把程序的名字设置为RDD_Movie_Users_Analyzer2
val conf=new SparkConf().setMaster("local[*]").setAPPName("RDD_Movie_Users_Analyzer2")
//Spark2.0 引入SparkSession 封装了Sparkcontext 和SQLContext ,并且会在
//builder 的getOrCreate 方法中判断是否有符合要求的SparkSession 存在,有则使用,没有则进行创建
val spark=SparkSession.builder().config(conf).getOrCreate()
//获取SparkSession 的SparkContext
val sc=spark.sparkContext
//把Spark 程序运行时的日志设置为warn 级别,以方便查看运行结果
sc.setLogLevel("warn")
//把用到的数据加栽进来转换为RDD ,此时使用sc.textFile 并不会读取文件,而是标记了有这个操作,遇到Action 级别算子时才会真正去读取文件
val usersRDD=sc.textFile(dataPath+"/users.dat")
val moviesRDD=sc.textFile(dataPath+"/movies.dat")
val ratingsRDD=sc.textFile(dataPath+"/ratings.dat")
//具体的数据处理业务逻辑
//打印出所有电影中评分最高的前10 个电影名和平均评分
println("所有电影中评分最高(口碑最好)的前10个电影名和平均评分:")
//第一步:
//,从moviesRDD 中取出MovieID 和Name,如果后面的代码重复使用这些数据,则可以把它们缓存起来。
// 首先把使用map 算子上面的RDD 中的每一个元素(即文件中的每一行)以"::" 为分隔符进行拆分,
// 然后再使用map算子从拆分后得到的数组中取出需要用到的元素,并把得到的RDD 缓存起来。
//取出MovieID 和Name
val movieinfo = moviesRDD.map(x=>(x.split("::"))).map(x=>(x(0),x(1))).cache()
//从ratingsRDD中取出UserID,MovieID 和rating
val ratings = ratingsRDD.map(_.split("::")).map(x=>(x(0),x(1),x(2),x(3))).cache()
//第二步:
//从ratings 的数据中使用map 算子获取到形如(movieID,(rating,1)格式的RDD,
//然后使用reduceByKey 把每个电影的总评分以及点评人数算出来。
//ratings.map(x=>(x._1,x._2,x._3,x._4)).take(10).foreach(println)//(1,1193,5,978300760)
//ratings.map(x=>(x._2,(x._3,1))).take(10).foreach(println)//(1193,(5,1))
//此时得到的RDD 格式为(movieID,(Sum(ratings ),Count(ratings)))(1380,(2923.0,817))
val moviesAndRatings = ratings.map(x => (x._2, (x._3.toDouble, 1)))
.reduceByKey((x, y) => (x._1 + y._1, x._2 + y._2))//(2828,(255.0,121))
//第三步:得到格式为(movieID,(Sum(ratings ),Count(ratings))),
// 把每个电影的Sum(ratings)和 Count(ratings)相除,得到包含了电影ID 和平均评分的RDD:
val avgRatings = moviesAndRatings.map(x=>(x._1,x._2._1.toDouble/x._2._2))
// avgRatings.foreach(println)//得到包含了电影ID 和平均评分的RDD :(2834,3.5555555555555554)
//第四步:把avgRatings 与movielnfo 通过关键字key(movieID)连接到一起,得到形如(movieID,(MovieName,AvgRating)) 的RDD ,
// 然后格式化为( AvgRating,MovieName ),并按照key (也就是平均评分)降序排列,最终取出前10 个并打印出来。
avgRatings.join(movieinfo).map(item=>(item._2._1,item._2._2))
.sortByKey(false).take(10).foreach(record=>println(record._2+"平均评分为:"+record._1))
// avgRatings.join(movieinfo).map(item=>(item._2._1,item._2._2)).take(1).foreach(println)
// //(2.1074380165289255,Dudley Do-Right (1999))
// println("++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++")
// avgRatings.join(movieinfo).map(item=>(item._1,item._2,item._2._2,item._2._1)).take(1).foreach(println)
// // (2828,(2.1074380165289255,Dudley Do-Right (1999)),Dudley Do-Right (1999),2.1074380165289255)
// println("@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@")
//
// avgRatings.take(1).foreach(println)
// //(2828,2.1074380165289255)
// println("#################################################################################")
// movieinfo.take(1).foreach(println)
//(1,Toy story (1995))
//最后关闭SparkSession
sc.stop()
}
}
所有电影中评分最高(口碑最好)的前10个电影名和平均评分:
Schlafes Bruder (Brother of Sleep) (1995)平均评分为:5.0
Gate of Heavenly Peace, The (1995)平均评分为:5.0
Lured (1947)平均评分为:5.0
Bittersweet Motel (2000)平均评分为:5.0
Follow the Bitch (1998)平均评分为:5.0
Song of Freedom (1936)平均评分为:5.0
One Little Indian (1973)平均评分为:5.0
Baby, The (1973)平均评分为:5.0
Smashing Time (1967)平均评分为:5.0
Ulysses (Ulisse) (1954)平均评分为:5.0
相关阅读
A5创业网(公众号:iadmin5)9月19日报道,熟悉在网上购买电影票的用户都知道,电影票基本上是“不退不改”或“只改不退&rd
随着时间的推移,网贝兼者们所熟知各大引流平台管理是越来越严格。造成了很多网贝兼者们不得不寻找新的引流处女地。今天朱海涛在这
A5创业网(公众号:iadmin5)消息,豆瓣电影官方微博最近晒出起诉状,以名誉权纠纷为由起诉《纯洁心灵·逐梦演艺圈》导演毕志飞及
都知道淘宝店铺DSR评分是不仅是决定自然搜索权重的因素之一,还是淘宝报名活动的一道门槛。如果DSR评分高,说明买家对你店铺的信任度
A5创业网(公众号:iadmin5)1月7日讯,据相关媒体报道称,苹果三星达成合作关系,不久,三星智能电视的用户将能够访问iTunes电影和电视内容