BigData--大数据技术之SparkSQL
时间:2022-07-25
本文章向大家介绍BigData--大数据技术之SparkSQL,主要内容包括其使用实例、应用技巧、基本知识点总结和需要注意事项,具有一定的参考价值,需要的朋友可以参考一下。
一、Spark SQL概述
1、DataFrame
与RDD类似,DataFrame也是一个分布式数据容器。然而DataFrame更像传统数据库的二维表格,除了数据以外,还记录数据的结构信息,即schema。同时,与Hive类似,DataFrame也支持嵌套数据类型(struct、array和map)。从API易用性的角度上看,DataFrame API提供的是一套高层的关系操作,比函数式的RDD API要更加友好,门槛更低。
2、DataSet
- 1)是
Dataframe API
的一个扩展,是Spark最新的数据抽象。 - 2)用户友好的API风格,既具有类型安全检查也具有Dataframe的查询优化特性。
- 3)Dataset支持编解码器,当需要访问非堆上的数据时可以避免反序列化整个对象,提高了效率。
- 4)样例类被用来在Dataset中定义数据的结构信息,样例类中每个属性的名称直接映射到DataSet中的字段名称。
- 5) Dataframe是Dataset的特列,
DataFrame=Dataset[Row]
,所以可以通过as方法将Dataframe转换为Dataset。Row是一个类型,跟Car、Person这些的类型一样,所有的表结构信息我都用Row来表示。 - 6)DataSet是强类型的。比如可以有Dataset[Car],Dataset[Person].
- 7)DataFrame只是知道字段,但是不知道字段的类型,所以在执行这些操作的时候是没办法在编译的时候检查是否类型失败的,比如你可以对一个String进行减法操作,在执行的时候才报错,而DataSet不仅仅知道字段,而且知道字段类型,所以有更严格的错误检查。就跟JSON对象和类对象之间的类比。
二、SparkSQL程序
1、user.json
json
{"id" : "1201", "name" : "satish", "age" : "25"}
{"id" : "1202", "name" : "krishna", "age" : "28"}
{"id" : "1203", "name" : "amith", "age" : "39"}
{"id" : "1204", "name" : "javed", "age" : "23"}
{"id" : "1205", "name" : "prudvi", "age" : "23"}
2、基本使用
scala
def main(args: Array[String]): Unit = {
//设置配置
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Spark SQL")
//创建SparkSession
val spark = SparkSession
.builder()
.appName("Spark SQL basic example")
.config(sparkConf)
.getOrCreate()
//加载json数据
val dataFrame = spark.read.json("data\user.json")
//创建user视图
dataFrame.createOrReplaceTempView("user")
//执行SQL语句,并打印结果
spark.sql("select * from user where age > 25").show()
//关闭
spark.stop
}
3、相互转换
scala
//设置配置
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Spark SQL")
//创建SparkSession
val spark = SparkSession
.builder()
.appName("Spark SQL basic example")
.config(sparkConf)
.getOrCreate()
//进行转换之前,需要引入隐式转换规则
import spark.implicits._
// 创建RDD
val rdd = spark.sparkContext.makeRDD(List((1, "michong", 20), (2, "qjzxzxd", 21), (3, "米虫", 18)))
// 转换为DF
val df = rdd.toDF("id", "name", "age")
df.show()
// 转换为DS
val ds = df.as[User]
// 转换为DF
val df1 = ds.toDF()
// 转换为RDD
val rdd1 = df1.rdd
rdd1.foreach(row=>{
println(row.getString(1))
})
//释放资源
spark.stop
4、RDD和DataSet之间相互转换
scala
//设置配置
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Spark SQL")
//创建SparkSession
val spark = SparkSession
.builder()
.appName("Spark SQL basic example")
.config(sparkConf)
.getOrCreate()
//进行转换之前,需要引入隐式转换规则
import spark.implicits._
// 创建RDD
val rdd = spark.sparkContext.makeRDD(List((1, "michong", 20), (2, "qjzxzxd", 21), (3, "米虫", 18)))
val userRDD = rdd.map{
case (id,name,age)=>{
User(id,name,age)
}
}
//RDD转换为DataSet
val userDS = userRDD.toDS()
//Represents the content of the Dataset as an `RDD` of `T`.
val rdd1 = userDS.rdd
rdd1.foreach(println)
//释放资源
spark.stop
5、用户自定义聚合函数
方式一
scala
object hello4 {
def main(args: Array[String]): Unit = {
//设置配置
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Spark SQL")
//创建SparkSession
val spark = SparkSession
.builder()
.appName("Spark SQL basic example")
.config(sparkConf)
.getOrCreate()
//创建聚合函数
val udaf = new MyAgeAvgFunction
spark.udf.register("avgAge",udaf)
//使用聚合函数
val frame = spark.read.json("data/user.json")
frame.createOrReplaceTempView("user")
spark.sql("select avgAge(age) from user").show
spark.stop
}
}
// 声明用户自定义聚合函数
// 1)继承UserDefinedAggregateFunction
// 2)实现方法
class MyAgeAvgFunction extends UserDefinedAggregateFunction {
// 函数输入的数据结构
override def inputSchema: StructType = {
new StructType().add("age", LongType)
}
// 计算时的数据结构
override def bufferSchema: StructType = {
new StructType().add("sum", LongType).add("count", LongType)
}
// 函数返回的数据类型
override def dataType: DataType = DoubleType
// 函数是否稳定
override def deterministic: Boolean = true
//计算之前的缓冲区的初始化
override def initialize(buffer: MutableAggregationBuffer): Unit = {
buffer(0) = 0L
buffer(1) = 0L
}
// 根据查询结果更新缓冲区的数据
override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
buffer(0) = buffer.getLong(0) + input.getLong(0)
buffer(1) = buffer.getLong(1) + 1
}
// 将多个节点的缓冲区合并
override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
// sum
buffer1(0) = buffer1.getLong(0) + buffer2.getLong(0)
// count
buffer1(1) = buffer1.getLong(1) + buffer2.getLong(1)
}
// 计算
override def evaluate(buffer: Row): Any = {
buffer.getLong(0).toDouble / buffer.getLong(1)
}
}
方式二(强类型)
scala
import org.apache.spark.SparkConf
import org.apache.spark.sql.{DataFrame, Dataset, Encoder, Encoders, SparkSession}
import org.apache.spark.sql.expressions.Aggregator
object hello5 {
def main(args: Array[String]): Unit = {
//设置配置
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Spark SQL")
//创建SparkSession
val spark = SparkSession
.builder()
.appName("Spark SQL basic example")
.config(sparkConf)
.getOrCreate()
// 引入隐式转换
import spark.implicits._
//创建聚合函数
val udaf = new MyAgeAvgClassFunction
//将聚合函数转化为查询列
val avgCol = udaf.toColumn.name("avgAge")
//使用聚合函数
val frame:DataFrame = spark.read.json("data/user.json")
val userDS :Dataset[UserBean]= frame.as[UserBean]
//应用函数
userDS.select(avgCol).show()
spark.stop
}
}
case class UserBean(name: String, age: BigInt)
case class AvgBuffer(var sum: BigInt, var count: Int)
// 声明用户自定义聚合函数(强类型)
// 1)继承Aggregator
// 2)实现方法
class MyAgeAvgClassFunction extends Aggregator[UserBean, AvgBuffer, Double] {
//初始化
override def zero: AvgBuffer = {
AvgBuffer(0, 0)
}
/**
* 聚合数据
*
* @param b
* @param a
* @return
*/
override def reduce(b: AvgBuffer, a: UserBean): AvgBuffer = {
b.sum = b.sum + a.age
b.count = b.count + 1
b
}
/**
* 缓冲区合并操作
*
* @param b1
* @param b2
* @return
*/
override def merge(b1: AvgBuffer, b2: AvgBuffer): AvgBuffer = {
b1.sum = b1.sum + b2.sum
b1.count = b1.count + b2.count
b1
}
/**
* 完成计算
*
* @param reduction
* @return
*/
override def finish(reduction: AvgBuffer): Double = {
reduction.sum.toDouble / reduction.count
}
override def bufferEncoder: Encoder[AvgBuffer] = Encoders.product
override def outputEncoder: Encoder[Double] = Encoders.scalaDouble
}
6、Spark连接MySQL数据库
scala
//设置配置
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Spark SQL")
//创建SparkSession
val spark = SparkSession
.builder()
.appName("Spark SQL basic example")
.config(sparkConf)
.getOrCreate()
val jdbcDF = spark.read
.format("jdbc")
.option("url", "jdbc:mysql://127.0.0.1:3306/qiniuyun?serverTimezone=CTT&useUnicode=true&characterEncoding=UTF8")
.option("dbtable", "myfile")
.option("user", "root")
.option("password", "root")
.load()
// 创建视图
jdbcDF.createOrReplaceTempView("myfile")
// 查询出数据
spark.sql("select * from myfile").show
- 第五章 正则表达式的拆分【修订】
- 仿今天头条加载环境文字闪动效果
- Myexclipse创建Junit测试
- 基于ASP.NET MVC(C#)和Quartz.Net组件实现的定时执行任务调度
- 第五章 正则表达式的拆分
- android 防止反编译的若干方法
- 支持Ajax跨域访问ASP.NET Web Api 2(Cors)的简单示例教程演示
- android 股票K线图
- github搭建个人网站
- Android:一个高效的UI才是一个拉风的UI
- 什么是ORM?为什么用ORM?浅析ORM的使用及利弊
- .NET[C#]中实现实体对象深拷贝(克隆/复制)的几种方法
- Android中图片大小和屏幕密度的关系讲解
- C# WINFORM通过委托和事件窗体间(跨窗体)传值(自定义事件参数)--实例详解
- JavaScript 教程
- JavaScript 编辑工具
- JavaScript 与HTML
- JavaScript 与Java
- JavaScript 数据结构
- JavaScript 基本数据类型
- JavaScript 特殊数据类型
- JavaScript 运算符
- JavaScript typeof 运算符
- JavaScript 表达式
- JavaScript 类型转换
- JavaScript 基本语法
- JavaScript 注释
- Javascript 基本处理流程
- Javascript 选择结构
- Javascript if 语句
- Javascript if 语句的嵌套
- Javascript switch 语句
- Javascript 循环结构
- Javascript 循环结构实例
- Javascript 跳转语句
- Javascript 控制语句总结
- Javascript 函数介绍
- Javascript 函数的定义
- Javascript 函数调用
- Javascript 几种特殊的函数
- JavaScript 内置函数简介
- Javascript eval() 函数
- Javascript isFinite() 函数
- Javascript isNaN() 函数
- parseInt() 与 parseFloat()
- escape() 与 unescape()
- Javascript 字符串介绍
- Javascript length属性
- javascript 字符串函数
- Javascript 日期对象简介
- Javascript 日期对象用途
- Date 对象属性和方法
- Javascript 数组是什么
- Javascript 创建数组
- Javascript 数组赋值与取值
- Javascript 数组属性和方法
- 错误诊断:索引数据错误导致ORA-00600 [kdsgrp1]处理
- Python让你成为AI 绘画大师,简直太惊艳了!(附代码))
- PostgreSQL全局临时表插件pgtt的使用
- 首次在手机端不牺牲准确率实现BERT实时推理,比TensorFlow-Lite快近8倍,每帧只需45ms
- 【小白学PyTorch】7.最新版本torchvision.transforms常用API翻译与讲解
- 【SOT】siameseFC论文和代码解析
- 基于OpenCV创建视频会议虚拟背景
- 【算法】图文并茂,一文了解 8 种常见的数据结构
- 深度了解特征工程
- Nginx应用场景之虚拟主机
- shell tcping 端口,ping网段所有ip端口或tcping指定IP端口
- Redis | 源码阅读 —— 链表
- Python从入门到熟练(4):基础数据类型
- 【MaskTheFace】给人脸图片戴口罩!
- Python从入门到熟练(5): 数据类型进阶