Spart DataSet数据集
时间:2022-05-07
本文章向大家介绍Spart DataSet数据集,主要内容包括其使用实例、应用技巧、基本知识点总结和需要注意事项,具有一定的参考价值,需要的朋友可以参考一下。
]Spark引入DataFrame, 它可以提供high-level functions让Spark更好的处理结构数据的计算。 这让Catalyst optimizer 和Tungsten(钨丝) execution engine自动加速大数据分析。 发布DataFrame之后开发者收到了很多反馈, 其中一个主要的是大家反映缺乏编译时类型安全。 为了解决这个问题,Spark采用新的Dataset API (DataFrame API的类型扩展)。 Dataset API扩展DataFrame API支持静态类型和运行已经存在的Scala或Java语言的用户自定义函数。 对比传统的RDD API,Dataset API提供更好的内存管理,特别是在长任务中有更好的性能提升
SparkDatasets.png
#创建DataSet
case class Data(a: Int, b: String)
val ds = Seq(Data(1, "one"), Data(2, "two")).toDS()
ds.collect()
ds.show()
#创建DataSet
case class Person(name: String, zip: Long)
val df = sqlContext.read.json(sc.parallelize("""{"zip": 94709, "name": "Michael"}""" :: Nil))
df.as[Person].collect()
df.as[Person].show()
#DataSet的WordCount
import org.apache.spark.sql.functions._
val ds = sqlContext.read.text("hdfs://node-1.sxt.cn:9000/wc").as[String]
val result = ds.flatMap(_.split(" ")).filter(_ != "").toDF().groupBy($"value").agg(count("*") as "numOccurances").orderBy($"numOccurances" desc)
val wordCount = ds.flatMap(_.split(" ")).filter(_ != "").groupBy(_.toLowerCase()).count()
#创建DataSet
val lines = sqlContext.read.text("hdfs://node-1.sxt.cn:9000/wc").as[String]
#对DataSet进行操作
val words = lines.flatMap(_.split(" ")).filter(_ != "")
#查看DataSet中的内容
words.collect
words.show
#分组求和
val counts = words.groupBy(_.toLowerCase).count()
--------------------------------------------------------------------------------------------------------------
{"name": "UC Berkeley", "yearFounded": 1868, "numStudents": 37581}
{"name": "MIT", "yearFounded": 1860, "numStudents": 11318}
#向hdfs中上传数据:/usr/local/hadoop-2.6.4/bin/hdfs dfs -put schools.json /
#定义case class
case class University(name: String, numStudents: Long, yearFounded: Long)
#创建DataSet
val schools = sqlContext.read.json("hdfs://node-1.sxt.cn:9000/schools.json").as[University]
#操作DataSet
schools.map(sc => s"${sc.name} is ${2015 - sc.yearFounded} years old").show
#JSON -> DataFrame
val df = sqlContext.read.json("hdfs://node-1.sxt.cn:9000/person.json")
df.where($"age" >= 20).show
df.where(col("age") >= 20).show
df.printSchema
#DataFrame -> Dataset
case class Person(age: Long, name: String)
val ds = df.as[Person]
ds.filter(_.age >= 20).show
// Dataset -> DataFrame
val df2 = ds.toDF
import org.apache.spark.sql.types._
df.where($"age" > 0).groupBy((($"age" / 10) cast IntegerType) * 10 as "decade").agg(count("*")).orderBy($"decade").show
ds.filter(_.age > 0).groupBy(p => (p.age / 10) * 10).agg(count("name")).toDF().withColumnRenamed("value", "decade").orderBy("decade") .show
val df = sqlContext.read.json("hdfs://node-1.sxt.cn:9000/student.json")
case class Student(name: String, age: Long, major: String)
val studentDS = df.as[Student]
studentDS.select($"name".as[String], $"age".as[Long]).filter(_._2 > 19).collect()
studentDS.groupBy(_.major).count().collect()
import org.apache.spark.sql.functions._
studentDS.groupBy(_.major).agg(avg($"age").as[Double]).collect()
case class Major(shortName: String, fullName: String)
val majors = Seq(Major("CS", "Computer Science"), Major("Math", "Mathematics")).toDS()
val joined = studentDS.joinWith(majors, $"major" === $"shortName")
joined.map(s => (s._1.name, s._2.fullName)).show()
joined.explain()
- 微信小程序,让生活不一样
- rsync+inotify实时同步环境部署记录
- 常用rsync命令操作梳理
- 无人驾驶系列——深度学习笔记:Tensorflow基本概念
- Android Fragment应用实战
- c#4.0中的不变(invariant)、协变(covariant)、逆变(contravariant)小记
- 用于.NET的可移植HTTP客户端
- 我是如何利用CSRF Get DedeCms Shell的
- asp.net webform中submit按钮使用不当很容易犯的一个错误
- 使用代码为textview设置drawableLeft
- 使用 ASP.NET Web API 构建超媒体 Web API
- Robert Xiao:下一个触点在哪里?
- 使用脚本操作UpdatePanel中控件的问题
- Gerrit上分支操作记录(创建分支、删除分支)
- JavaScript 教程
- JavaScript 编辑工具
- JavaScript 与HTML
- JavaScript 与Java
- JavaScript 数据结构
- JavaScript 基本数据类型
- JavaScript 特殊数据类型
- JavaScript 运算符
- JavaScript typeof 运算符
- JavaScript 表达式
- JavaScript 类型转换
- JavaScript 基本语法
- JavaScript 注释
- Javascript 基本处理流程
- Javascript 选择结构
- Javascript if 语句
- Javascript if 语句的嵌套
- Javascript switch 语句
- Javascript 循环结构
- Javascript 循环结构实例
- Javascript 跳转语句
- Javascript 控制语句总结
- Javascript 函数介绍
- Javascript 函数的定义
- Javascript 函数调用
- Javascript 几种特殊的函数
- JavaScript 内置函数简介
- Javascript eval() 函数
- Javascript isFinite() 函数
- Javascript isNaN() 函数
- parseInt() 与 parseFloat()
- escape() 与 unescape()
- Javascript 字符串介绍
- Javascript length属性
- javascript 字符串函数
- Javascript 日期对象简介
- Javascript 日期对象用途
- Date 对象属性和方法
- Javascript 数组是什么
- Javascript 创建数组
- Javascript 数组赋值与取值
- Javascript 数组属性和方法
- 如何在树莓派4B上安装EMQ X Broker
- SQL | SQL 必知必会笔记 (二)
- 基于桶子法实现的两种排序算法
- Notes | 微观经济学课堂笔记(一)
- 将终结点图添加到你的ASP.NET Core应用程序中
- Stata | 爬取 CFPS 文献传送门并制作成 Markdown
- 委托的好处
- Elasticsearch安装和配置
- Notes | QUAIDS 模型
- Stata | 520,听说你也想快点找到...
- Stata | 批量替换变量值的小技巧
- 手把手教你完成课设作业使用Pandas对海平面温度异常进行分析,小白也能看的懂
- 谈谈自学 Stata 的体会
- Latex修改字体字号的大小
- Notes | GitHub Upload Large Files