spark2 sql读取数据源编程学习样例2:函数实现详解
问题导读 1.RDD转换为DataFrame需要导入哪个包? 2.Json格式的Dataset如何转换为DateFrame? 3.如何实现通过jdbc读取和保存数据到数据源? spark2 sql读取数据源编程学习样例1 http://www.aboutyun.com/forum.php?mod=viewthread&tid=23484 这里接着上篇,继续阅读代码,下面我们看看runBasicParquetExample函数的功能实现
runBasicParquetExample函数
[Scala] 纯文本查看 复制代码
?
private def runBasicParquetExample(spark: SparkSession): Unit = {
// $example on:basic_parquet_example$
// Encoders for most common types are automatically provided by importing spark.implicits._
import spark.implicits._
val peopleDF = spark.read.json("examples/src/main/resources/people.json")
// DataFrames can be saved as Parquet files, maintaining the schema information
peopleDF.write.parquet("people.parquet")
// Read in the parquet file created above
// Parquet files are self-describing so the schema is preserved
// The result of loading a Parquet file is also a DataFrame
val parquetFileDF = spark.read.parquet("people.parquet")
// Parquet files can also be used to create a temporary view and then used in SQL statements
parquetFileDF.createOrReplaceTempView("parquetFile")
val namesDF = spark.sql("SELECT name FROM parquetFile WHERE age BETWEEN 13 AND 19")
namesDF.map(attributes => "Name: " + attributes(0)).show()
// +------------+
// | value|
// +------------+
// |Name: Justin|
// +------------+
// $example off:basic_parquet_example$
}
这里面有一个包的导入
[Scala] 纯文本查看 复制代码
?
import spark.implicits._
Scala中与其它语言的区别是在对象,函数中可以导入包。这个包的作用是转换RDD为DataFrame。
[Scala] 纯文本查看 复制代码
?
val peopleDF = spark.read.json("examples/src/main/resources/people.json")
上面自然是读取json文件。 peopleDF.write.parquet("people.parquet") 这里同样是保存文件,不过people.parquet是文件夹。文件夹里面是数据,其中有*00000*为数据文件。
[Scala] 纯文本查看 复制代码
?
val namesDF = spark.sql("SELECT name FROM parquetFile WHERE age BETWEEN 13 AND 19")
这里调用sql语句。
[Scala] 纯文本查看 复制代码
?
namesDF.map(attributes => "Name: " + attributes(0)).show()
这里通过map映射,增加Name:
[Scala] 纯文本查看 复制代码
?
// +------------+
// | value|
// +------------+
// |Name: Justin|
// +------------+
// $example off:basic_parquet_example$
runParquetSchemaMergingExample函数
[Scala] 纯文本查看 复制代码
?
private def runParquetSchemaMergingExample(spark: SparkSession): Unit = {
// $example on:schema_merging$
// This is used to implicitly convert an RDD to a DataFrame.
import spark.implicits._
// Create a simple DataFrame, store into a partition directory
val squaresDF = spark.sparkContext.makeRDD(1 to 5).map(i => (i, i * i)).toDF("value", "square")
squaresDF.write.parquet("data/test_table/key=1")
// Create another DataFrame in a new partition directory,
// adding a new column and dropping an existing column
val cubesDF = spark.sparkContext.makeRDD(6 to 10).map(i => (i, i * i * i)).toDF("value", "cube")
cubesDF.write.parquet("data/test_table/key=2")
// Read the partitioned table
val mergedDF = spark.read.option("mergeSchema", "true").parquet("data/test_table")
mergedDF.printSchema()
// The final schema consists of all 3 columns in the Parquet files together
// with the partitioning column appeared in the partition directory paths
// root
// |-- value: int (nullable = true)
// |-- square: int (nullable = true)
// |-- cube: int (nullable = true)
// |-- key: int (nullable = true)
// $example off:schema_merging$
}
[Scala] 纯文本查看 复制代码
?
val squaresDF = spark.sparkContext.makeRDD(1 to 5).map(i => (i, i * i)).toDF("value", "square")
squaresDF.write.parquet("data/test_table/key=1")
上面是创建一个RDD,然后通过toDF转换为DataFrame。然后保存到分区目录下。
[Scala] 纯文本查看 复制代码
?
val cubesDF = spark.sparkContext.makeRDD(6 to 10).map(i => (i, i * i * i)).toDF("value", "cube")
cubesDF.write.parquet("data/test_table/key=2")
创建另外一个DataFrame,并且添加一个新列,删除现有列
[Scala] 纯文本查看 复制代码
?
val mergedDF = spark.read.option("mergeSchema", "true").parquet("data/test_table")
mergedDF.printSchema()
上面自然是读取数据保存为DataFrame,option("mergeSchema", "true"), 默认值由spark.sql.parquet.mergeSchema指定。设置所有的分区文件是否合并Schema。设置后将覆盖spark.sql.parquet.mergeSchema指定值。 runJsonDatasetExample函数
[Scala] 纯文本查看 复制代码
?
private def runJsonDatasetExample(spark: SparkSession): Unit = {
// $example on:json_dataset$
// Primitive types (Int, String, etc) and Product types (case classes) encoders are
// supported by importing this when creating a Dataset.
import spark.implicits._
// A JSON dataset is pointed to by path.
// The path can be either a single text file or a directory storing text files
val path = "examples/src/main/resources/people.json"
val peopleDF = spark.read.json(path)
// The inferred schema can be visualized using the printSchema() method
peopleDF.printSchema()
// root
// |-- age: long (nullable = true)
// |-- name: string (nullable = true)
// Creates a temporary view using the DataFrame
peopleDF.createOrReplaceTempView("people")
// SQL statements can be run by using the sql methods provided by spark
val teenagerNamesDF = spark.sql("SELECT name FROM people WHERE age BETWEEN 13 AND 19")
teenagerNamesDF.show()
// +------+
// | name|
// +------+
// |Justin|
// +------+
// Alternatively, a DataFrame can be created for a JSON dataset represented by
// a Dataset[String] storing one JSON object per string
val otherPeopleDataset = spark.createDataset(
"""{"name":"Yin","address":{"city":"Columbus","state":"Ohio"}}""" :: Nil)
val otherPeople = spark.read.json(otherPeopleDataset)
otherPeople.show()
// +---------------+----+
// | address|name|
// +---------------+----+
// |[Columbus,Ohio]| Yin|
// +---------------+----+
// $example off:json_dataset$
}
上面有些代码重复了,就不在解释了。
[Scala] 纯文本查看 复制代码
?
val path = "examples/src/main/resources/people.json" val peopleDF = spark.read.json(path)
// The inferred schema can be visualized using the printSchema() method
peopleDF.printSchema()
// root
// |-- age: long (nullable = true)
// |-- name: string (nullable = true)
上面是读取json,并显示schema。
[Scala] 纯文本查看 复制代码
?
// Creates a temporary view using the DataFrame
peopleDF.createOrReplaceTempView("people")
// SQL statements can be run by using the sql methods provided by spark
val teenagerNamesDF = spark.sql("SELECT name FROM people WHERE age BETWEEN 13 AND 19")
teenagerNamesDF.show()
peopleDF.createOrReplaceTempView("people")是DataFrame注册为people表
[Scala] 纯文本查看 复制代码
?
teenagerNamesDF.show()
自然是显示数据。 如下
[Scala] 纯文本查看 复制代码
?
// +------+
// | name|
// +------+
// |Justin|
// +------+
[Scala] 纯文本查看 复制代码
?
val otherPeopleDataset = spark.createDataset(
"""{"name":"Yin","address":{"city":"Columbus","state":"Ohio"}}""" :: Nil)
这里创建一个json格式的dataset
[Scala] 纯文本查看 复制代码
?
val otherPeople = spark.read.json(otherPeopleDataset)
这行代码,是读取上面创建的dataset,然后创建DataFrame。从上面我们看出这也是dataset和DataFrame转换的一种方式。 runJdbcDatasetExample函数
[Scala] 纯文本查看 复制代码
?
private def runJdbcDatasetExample(spark: SparkSession): Unit = {
// $example on:jdbc_dataset$
// Note: JDBC loading and saving can be achieved via either the load/save or jdbc methods
// Loading data from a JDBC source
val jdbcDF = spark.read
.format("jdbc")
.option("url", "jdbc:postgresql:dbserver")
.option("dbtable", "schema.tablename")
.option("user", "username")
.option("password", "password")
.load()
val connectionProperties = new Properties()
connectionProperties.put("user", "username")
connectionProperties.put("password", "password")
val jdbcDF2 = spark.read
.jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties)
// Specifying the custom data types of the read schema
connectionProperties.put("customSchema", "id DECIMAL(38, 0), name STRING")
val jdbcDF3 = spark.read
.jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties)
// Saving data to a JDBC source
jdbcDF.write
.format("jdbc")
.option("url", "jdbc:postgresql:dbserver")
.option("dbtable", "schema.tablename")
.option("user", "username")
.option("password", "password")
.save()
jdbcDF2.write
.jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties)
// Specifying create table column data types on write
jdbcDF.write
.option("createTableColumnTypes", "name CHAR(64), comments VARCHAR(1024)")
.jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties)
// $example off:jdbc_dataset$
}
}
这个是运行Jdbc Dataset的例子。那么如何从jdbc读取数据,是通过下面各个option
[Scala] 纯文本查看 复制代码
?
.option("url", "jdbc:postgresql:dbserver")
.option("dbtable", "schema.tablename")
.option("user", "username")
.option("password", "password")
第一行server也就是服务器地址 第二行是表名 第三行是用户名 第四行为密码,相信大家也能看明白。
[Scala] 纯文本查看 复制代码
?
val connectionProperties = new Properties()
Properties这个是用来做什么的那? 我们来看官网
它是 JDBC database 连接的一个参数,是一个字符串tag/value的列表。于是有了下面内容
[Scala] 纯文本查看 复制代码
?
connectionProperties.put("user", "username")
connectionProperties.put("password", "password")
我们看到上面放入了用户名和密码
[Scala] 纯文本查看 复制代码
?
val jdbcDF2 = spark.read
.jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties)
这里设置了连接url,表名,还有connectionProperties connectionProperties.put("customSchema", "id DECIMAL(38, 0), name STRING") 上面是指定读取Schema的自定义数据类型。
[Scala] 纯文本查看 复制代码
?
jdbcDF.write
.format("jdbc")
.option("url", "jdbc:postgresql:dbserver")
.option("dbtable", "schema.tablename")
.option("user", "username")
.option("password", "password")
.save()
jdbcDF2.write
.jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties)
// Specifying create table column data types on write
jdbcDF.write
.option("createTableColumnTypes", "name CHAR(64), comments VARCHAR(1024)")
.jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties)
上面分别都是将数据通过jdbc保存到数据库
- 智能家居“智商”不够,能靠情商来补吗
- 大话MVP
- AngularJS in Action读书笔记1——扫平一揽子专业术语
- MS Enterprise Library 5.0发布!!
- WCF技术剖析之二十二: 深入剖析WCF底层异常处理框架实现原理[中篇]
- 《Enterprise Library深入解析与灵活应用》博文系列汇总
- 使命必达: 深入剖析WCF的可靠会话[概念篇]
- AngularJS in Action读书笔记2——view和controller的那些事儿
- WCF技术剖析之二十一:WCF基本异常处理模式[中篇]
- 小程序上线“小游戏”,正式引爆3.0社交红利
- 漫谈人工智能机器翻译的前世今生
- 并发与实例上下文模式: WCF服务在不同实例上下文模式下具有怎样的并发表现
- 区块链将变革的五个行业
- WCF技术剖析之二十三:服务实例(Service Instance)生命周期如何控制[上篇]
- JavaScript 教程
- JavaScript 编辑工具
- JavaScript 与HTML
- JavaScript 与Java
- JavaScript 数据结构
- JavaScript 基本数据类型
- JavaScript 特殊数据类型
- JavaScript 运算符
- JavaScript typeof 运算符
- JavaScript 表达式
- JavaScript 类型转换
- JavaScript 基本语法
- JavaScript 注释
- Javascript 基本处理流程
- Javascript 选择结构
- Javascript if 语句
- Javascript if 语句的嵌套
- Javascript switch 语句
- Javascript 循环结构
- Javascript 循环结构实例
- Javascript 跳转语句
- Javascript 控制语句总结
- Javascript 函数介绍
- Javascript 函数的定义
- Javascript 函数调用
- Javascript 几种特殊的函数
- JavaScript 内置函数简介
- Javascript eval() 函数
- Javascript isFinite() 函数
- Javascript isNaN() 函数
- parseInt() 与 parseFloat()
- escape() 与 unescape()
- Javascript 字符串介绍
- Javascript length属性
- javascript 字符串函数
- Javascript 日期对象简介
- Javascript 日期对象用途
- Date 对象属性和方法
- Javascript 数组是什么
- Javascript 创建数组
- Javascript 数组赋值与取值
- Javascript 数组属性和方法
- Android 中ListView和GridView赋值错位
- 谈谈类加载器
- Android中AlarmManager+Notification实现定时通知提醒功能
- Java垃圾回收相关面试题
- Android中backgroundDimEnabled的作用
- 甲基化相关的习题背景补充
- Android 完全退出的实例详解
- Android 双击Back键退出应用的实现方法
- 2020最全Java面试题--基础篇
- Android开发之背景动画简单实现方法
- 15.深入k8s:Event事件处理及其源码分析
- 说一说 HashMap 底层数据结构
- 详解Android Service 使用时的注意事项
- Android自定义View实现QQ运动积分转盘抽奖功能
- 请简述Spring JDBC是如何进行配置的