Delta Lake 学习笔记(三)
文章目录
1 Overview
国际惯例,学习一个新的框架,应该先找找官方文档有没有 QucikStart 之类的文档,尤其像砖厂这种公司,文档应该不会少的。大家可以打开 Delta Lake 官网查看 QuickStart,按照文档迅速过一次。
2 QucikStart 走读
2.1 Set up Apache Spark with Delta Lake
因为要方便跑 demo,我这里选择用 spark-shell 来交互式探索一下 Delta Lake 的功能。
按照文档介绍,Delta Lake 是需要 Spark 2.4.2 或以上版本的,所以大家最好去官网下载一个预先编译的 Spark 包。
按照上图,输入命令 bin/spark-shell --packages io.delta:delta-core_2.12:0.1.0
就可以启动加载了 Delta Lake 的 spark shell 了。
关于 --packages
的用法,是因为 Spark 有个专门解析选项参数的工具叫做 SparkSubmitOptionParser
,他可以解析到依赖,并且先在本地仓库找,没有的话就会根据你的 Maven 配置到远程拉取,这里 Spark 内部做了一些事情。
2.2 Create a table
创建一个 Delta 类型的表方法很简单,如下。
val data = spark.range(0, 5)
data.write.format("delta").save("/tmp/delta-table")
然后我们到目录下看看。
➜ delta-table tree
.
├── _delta_log
│ └── 00000000000000000000.json
├── part-00000-80eac632-e80e-4b63-ba0b-07e83667544c-c000.snappy.parquet
├── part-00001-cfced55c-3129-4db2-9330-d72e03b9a1b2-c000.snappy.parquet
├── part-00002-7cbfe8b0-a046-4ae8-91e8-5eb1c7bcedf7-c000.snappy.parquet
└── part-00003-8cae5863-12f2-476e-9c1b-e29720a39b66-c000.snappy.parquet
从上面的结果可以看到,在创建 Delta 表的时候,生成了一个 json 文件,这个文件也是 Delta 的 transaction log,也就是事务日志,所以的事务相关操作都会记录到这个日志中,可以做 replay 使用,后面研究源码的时候会深入分析,和若干 parquet 文件(Delta 底层使用的文件格式)。
{"commitInfo":{"timestamp":1556253526941,"operation":"WRITE","operationParameters":{"mode":"ErrorIfExists","partitionBy":"[]"}}}
{"protocol":{"minReaderVersion":1,"minWriterVersion":2}}
{"metaData":{"id":"3ae73d1f-4d33-4378-8f98-ba94b8204de9","format":{"provider":"parquet","options":{}},"schemaString":"{"type":"struct","fields":[{"name":"id","type":"long","nullable":true,"metadata":{}}]}","partitionColumns":[],"configuration":{},"createdTime":1556253525810}}
{"add":{"path":"part-00000-80eac632-e80e-4b63-ba0b-07e83667544c-c000.snappy.parquet","partitionValues":{},"size":423,"modificationTime":1556253526000,"dataChange":true}}
{"add":{"path":"part-00001-cfced55c-3129-4db2-9330-d72e03b9a1b2-c000.snappy.parquet","partitionValues":{},"size":423,"modificationTime":1556253526000,"dataChange":true}}
{"add":{"path":"part-00002-7cbfe8b0-a046-4ae8-91e8-5eb1c7bcedf7-c000.snappy.parquet","partitionValues":{},"size":423,"modificationTime":1556253526000,"dataChange":true}}
{"add":{"path":"part-00003-8cae5863-12f2-476e-9c1b-e29720a39b66-c000.snappy.parquet","partitionValues":{},"size":431,"modificationTime":1556253526000,"dataChange":true}}
2.3 Update the table data
为了方便展示,下图可以看到,用于 update 的数据是不一样的,见红色方框。
此时,表存储路径下的文件有了一些变化。
.
├── _delta_log
│ ├── 00000000000000000000.json
│ └── 00000000000000000001.json
├── part-00000-a2ff9308-0780-47bd-b666-8f5047802137-c000.snappy.parquet
├── part-00000-e96dc56e-2053-401a-8c18-fe34ccaf987b-c000.snappy.parquet
├── part-00001-c92a2add-2922-4e27-af17-a199951244fc-c000.snappy.parquet
├── part-00001-f6ace3ea-b44c-47b1-9a90-79373f6b0dd1-c000.snappy.parquet
├── part-00002-0d57c68e-4c1b-43d0-8ecf-9491c7ccc140-c000.snappy.parquet
├── part-00002-368241a5-f83a-4741-88be-b5ccfedc6363-c000.snappy.parquet
├── part-00003-15211aeb-9eae-4771-8d76-dbd8cf138546-c000.snappy.parquet
└── part-00003-8b38c077-bd5c-48f2-b342-c1eb9e8ccd03-c000.snappy.parquet
再看一下 transaction log 的变化。涉及到了
{"commitInfo":{"timestamp":1556326035295,"operation":"WRITE","operationParameters":{"mode":"Overwrite","partitionBy":"[]"},"readVersion":0}}
{"add":{"path":"part-00000-a2ff9308-0780-47bd-b666-8f5047802137-c000.snappy.parquet","partitionValues":{},"size":423,"modificationTime":1556326033000,"dataChange":true}}
{"add":{"path":"part-00001-c92a2add-2922-4e27-af17-a199951244fc-c000.snappy.parquet","partitionValues":{},"size":423,"modificationTime":1556326033000,"dataChange":true}}
{"add":{"path":"part-00002-368241a5-f83a-4741-88be-b5ccfedc6363-c000.snappy.parquet","partitionValues":{},"size":423,"modificationTime":1556326033000,"dataChange":true}}
{"add":{"path":"part-00003-15211aeb-9eae-4771-8d76-dbd8cf138546-c000.snappy.parquet","partitionValues":{},"size":431,"modificationTime":1556326033000,"dataChange":true}}
{"remove":{"path":"part-00003-8b38c077-bd5c-48f2-b342-c1eb9e8ccd03-c000.snappy.parquet","deletionTimestamp":1556326035294,"dataChange":true}}
{"remove":{"path":"part-00000-e96dc56e-2053-401a-8c18-fe34ccaf987b-c000.snappy.parquet","deletionTimestamp":1556326035295,"dataChange":true}}
{"remove":{"path":"part-00002-0d57c68e-4c1b-43d0-8ecf-9491c7ccc140-c000.snappy.parquet","deletionTimestamp":1556326035295,"dataChange":true}}
{"remove":{"path":"part-00001-f6ace3ea-b44c-47b1-9a90-79373f6b0dd1-c000.snappy.parquet","deletionTimestamp":1556326035295,"dataChange":true}}
这里特别提醒一下,可以看到事务日志有两个操作,一个是 add,一个是 remove。特别留意一下时间戳,update 文件都是先 add 再 remove 的,所以 add 的时间1556326033000是更早于 remove 的时间1556326035294。
{"add":{"path":"part-00003-15211aeb-9eae-4771-8d76-dbd8cf138546-c000.snappy.parquet","partitionValues":{},"size":431,"modificationTime":1556326033000,"dataChange":true}}
{"remove":{"path":"part-00003-8b38c077-bd5c-48f2-b342-c1eb9e8ccd03-c000.snappy.parquet","deletionTimestamp":1556326035294,"dataChange":true}}
2.4 Read data
默认读取的,都是最新的文件。
scala> val df = spark.read.format("delta").load("/tmp/delta-table")
df: org.apache.spark.sql.DataFrame = [id: bigint]
scala> df.show()
+---+
| id|
+---+
| 8|
| 9|
| 5|
| 7|
| 6|
+---+
2.5 Read older versions of data using Time Travel
Delta 为读取旧版本的数据,定义了个名字,叫做 Time travel,后面源码分析的时候会经常提及吧。Delta 提供了一个 option 来描述。
scala> val df = spark.read.format("delta").option("versionAsOf", 0).load("/tmp/delta-table")
df: org.apache.spark.sql.DataFrame = [id: bigint]
scala> df.show()
+---+
| id|
+---+
| 3|
| 4|
| 0|
| 2|
| 1|
+---+
2.6 Write a stream of data to a table
通过 Structure Streaming 也可以写流式的表,可以提供 Exactly-one。流式表是采用 append 方式来追加数据到原表里的。
➜ _delta_log ll
total 160
-rw-r--r-- 1 runzhliu wheel 1.1K 4 27 08:47 00000000000000000000.json
-rw-r--r-- 1 runzhliu wheel 1.4K 4 27 08:47 00000000000000000001.json
-rw-r--r-- 1 runzhliu wheel 466B 4 27 09:21 00000000000000000002.json
-rw-r--r-- 1 runzhliu wheel 976B 4 27 09:21 00000000000000000003.json
-rw-r--r-- 1 runzhliu wheel 976B 4 27 09:21 00000000000000000004.json
-rw-r--r-- 1 runzhliu wheel 976B 4 27 09:21 00000000000000000005.json
-rw-r--r-- 1 runzhliu wheel 976B 4 27 09:22 00000000000000000006.json
-rw-r--r-- 1 runzhliu wheel 976B 4 27 09:22 00000000000000000007.json
-rw-r--r-- 1 runzhliu wheel 976B 4 27 09:22 00000000000000000008.json
-rw-r--r-- 1 runzhliu wheel 976B 4 27 09:22 00000000000000000009.json
-rw-r--r-- 1 runzhliu wheel 14K 4 27 09:22 00000000000000000010.checkpoint.parquet
-rw-r--r-- 1 runzhliu wheel 976B 4 27 09:22 00000000000000000010.json
-rw-r--r-- 1 runzhliu wheel 977B 4 27 09:22 00000000000000000011.json
-rw-r--r-- 1 runzhliu wheel 809B 4 27 09:22 00000000000000000012.json
-rw-r--r-- 1 runzhliu wheel 809B 4 27 09:22 00000000000000000013.json
-rw-r--r-- 1 runzhliu wheel 979B 4 27 09:23 00000000000000000014.json
-rw-r--r-- 1 runzhliu wheel 25B 4 27 09:22 _last_checkpoint
➜ _delta_log ll
total 168
-rw-r--r-- 1 runzhliu wheel 1.1K 4 27 08:47 00000000000000000000.json
-rw-r--r-- 1 runzhliu wheel 1.4K 4 27 08:47 00000000000000000001.json
-rw-r--r-- 1 runzhliu wheel 466B 4 27 09:21 00000000000000000002.json
-rw-r--r-- 1 runzhliu wheel 976B 4 27 09:21 00000000000000000003.json
-rw-r--r-- 1 runzhliu wheel 976B 4 27 09:21 00000000000000000004.json
-rw-r--r-- 1 runzhliu wheel 976B 4 27 09:21 00000000000000000005.json
-rw-r--r-- 1 runzhliu wheel 976B 4 27 09:22 00000000000000000006.json
-rw-r--r-- 1 runzhliu wheel 976B 4 27 09:22 00000000000000000007.json
-rw-r--r-- 1 runzhliu wheel 976B 4 27 09:22 00000000000000000008.json
-rw-r--r-- 1 runzhliu wheel 976B 4 27 09:22 00000000000000000009.json
-rw-r--r-- 1 runzhliu wheel 14K 4 27 09:22 00000000000000000010.checkpoint.parquet
-rw-r--r-- 1 runzhliu wheel 976B 4 27 09:22 00000000000000000010.json
-rw-r--r-- 1 runzhliu wheel 977B 4 27 09:22 00000000000000000011.json
-rw-r--r-- 1 runzhliu wheel 809B 4 27 09:22 00000000000000000012.json
-rw-r--r-- 1 runzhliu wheel 809B 4 27 09:22 00000000000000000013.json
-rw-r--r-- 1 runzhliu wheel 979B 4 27 09:23 00000000000000000014.json
-rw-r--r-- 1 runzhliu wheel 979B 4 27 09:23 00000000000000000015.json
-rw-r--r-- 1 runzhliu wheel 25B 4 27 09:22 _last_checkpoint
可以看到流式表的事务日志是不断的在增加的。需要注意的是,流表在写的时候,是不影响读的,后面源码分析的时候,也会针对这个进行探索。
2.7 Read a stream of changes from a table
流式表可以边写边读,这里就不贴接结果了。
3 Summary
以上就是 Delta Lake 官网的 Qucik Start 的内容,建议大家可以按照以上内容来快速测试一下。
- Tarjan--LCA算法的个人理解即模板
- spark sql编程之实现合并Parquet格式的DataFrame的schema
- Oracle压缩黑科技(一)—基础表压缩
- 12 条用于 Linux 的 MySQL/MariaDB 安全最佳实践
- hdu----(4545)魔法串(LCS)
- Oracle压缩黑科技(二)—压缩数据的修改
- 在Pivotal Web Service上发布Spring Boot应用
- hdu---(1325)Is It A Tree?(并查集)
- spark2 sql编程样例:sql操作
- hdu----(1599)最大子矩阵(几何/dp)
- Go语言简单的TCP编程
- hdu---(1054)Strategic Game(最小覆盖边)
- Swagger Starter 1.4.0发布:新增swagger功能开源与全局参数的配置。
- Go语言语法汇总
- JavaScript 教程
- JavaScript 编辑工具
- JavaScript 与HTML
- JavaScript 与Java
- JavaScript 数据结构
- JavaScript 基本数据类型
- JavaScript 特殊数据类型
- JavaScript 运算符
- JavaScript typeof 运算符
- JavaScript 表达式
- JavaScript 类型转换
- JavaScript 基本语法
- JavaScript 注释
- Javascript 基本处理流程
- Javascript 选择结构
- Javascript if 语句
- Javascript if 语句的嵌套
- Javascript switch 语句
- Javascript 循环结构
- Javascript 循环结构实例
- Javascript 跳转语句
- Javascript 控制语句总结
- Javascript 函数介绍
- Javascript 函数的定义
- Javascript 函数调用
- Javascript 几种特殊的函数
- JavaScript 内置函数简介
- Javascript eval() 函数
- Javascript isFinite() 函数
- Javascript isNaN() 函数
- parseInt() 与 parseFloat()
- escape() 与 unescape()
- Javascript 字符串介绍
- Javascript length属性
- javascript 字符串函数
- Javascript 日期对象简介
- Javascript 日期对象用途
- Date 对象属性和方法
- Javascript 数组是什么
- Javascript 创建数组
- Javascript 数组赋值与取值
- Javascript 数组属性和方法
- MotherBoard JCOM to DB9 female By HKL,
- PHP实现根据请求的域名跳转到不同目录 By HKL, T
- frp定时监控脚本 By HKL, Tuesday 13
- iptables上入站流量同时启用DNAT和SNAT By HKL,
- tasker调用钉钉机器人实现短信转钉钉 By HKL,
- EdgeCore AS6700 ONIE Firmware固件 For HWr01c By HKL,
- sddm启动root登陆kde By HKL, Tuesd
- ELK收集处理Huawei/H3C交换机日志 By HKL,
- UniFi Register Device with keadhcp By HKL,
- Huawei esight to 钉钉dingding (RESTful API) By HKL,
- ngrokc rampis预编译版本 By HKL, Fr
- megacli修复raid1硬盘 By HKL, Tues
- DNSPOD自动更新公网IP脚本 By HKL, Wedn
- Zerotier网卡NAT via iptables By HKL,
- Github Pages同步到Qcloud腾讯云对象存储COS By HKL,