Delta Lake 学习笔记(三)

时间:2022-07-22
本文章向大家介绍Delta Lake 学习笔记(三),主要内容包括其使用实例、应用技巧、基本知识点总结和需要注意事项,具有一定的参考价值,需要的朋友可以参考一下。

文章目录

1 Overview

国际惯例,学习一个新的框架,应该先找找官方文档有没有 QucikStart 之类的文档,尤其像砖厂这种公司,文档应该不会少的。大家可以打开 Delta Lake 官网查看 QuickStart,按照文档迅速过一次。

2 QucikStart 走读

2.1 Set up Apache Spark with Delta Lake

因为要方便跑 demo,我这里选择用 spark-shell 来交互式探索一下 Delta Lake 的功能。

按照文档介绍,Delta Lake 是需要 Spark 2.4.2 或以上版本的,所以大家最好去官网下载一个预先编译的 Spark 包。

按照上图,输入命令 bin/spark-shell --packages io.delta:delta-core_2.12:0.1.0 就可以启动加载了 Delta Lake 的 spark shell 了。

关于 --packages 的用法,是因为 Spark 有个专门解析选项参数的工具叫做 SparkSubmitOptionParser,他可以解析到依赖,并且先在本地仓库找,没有的话就会根据你的 Maven 配置到远程拉取,这里 Spark 内部做了一些事情。

2.2 Create a table

创建一个 Delta 类型的表方法很简单,如下。

val data = spark.range(0, 5)
data.write.format("delta").save("/tmp/delta-table")

然后我们到目录下看看。

➜  delta-table tree
.
├── _delta_log
│   └── 00000000000000000000.json
├── part-00000-80eac632-e80e-4b63-ba0b-07e83667544c-c000.snappy.parquet
├── part-00001-cfced55c-3129-4db2-9330-d72e03b9a1b2-c000.snappy.parquet
├── part-00002-7cbfe8b0-a046-4ae8-91e8-5eb1c7bcedf7-c000.snappy.parquet
└── part-00003-8cae5863-12f2-476e-9c1b-e29720a39b66-c000.snappy.parquet

从上面的结果可以看到,在创建 Delta 表的时候,生成了一个 json 文件,这个文件也是 Delta 的 transaction log,也就是事务日志,所以的事务相关操作都会记录到这个日志中,可以做 replay 使用,后面研究源码的时候会深入分析,和若干 parquet 文件(Delta 底层使用的文件格式)。

{"commitInfo":{"timestamp":1556253526941,"operation":"WRITE","operationParameters":{"mode":"ErrorIfExists","partitionBy":"[]"}}}
{"protocol":{"minReaderVersion":1,"minWriterVersion":2}}
{"metaData":{"id":"3ae73d1f-4d33-4378-8f98-ba94b8204de9","format":{"provider":"parquet","options":{}},"schemaString":"{"type":"struct","fields":[{"name":"id","type":"long","nullable":true,"metadata":{}}]}","partitionColumns":[],"configuration":{},"createdTime":1556253525810}}
{"add":{"path":"part-00000-80eac632-e80e-4b63-ba0b-07e83667544c-c000.snappy.parquet","partitionValues":{},"size":423,"modificationTime":1556253526000,"dataChange":true}}
{"add":{"path":"part-00001-cfced55c-3129-4db2-9330-d72e03b9a1b2-c000.snappy.parquet","partitionValues":{},"size":423,"modificationTime":1556253526000,"dataChange":true}}
{"add":{"path":"part-00002-7cbfe8b0-a046-4ae8-91e8-5eb1c7bcedf7-c000.snappy.parquet","partitionValues":{},"size":423,"modificationTime":1556253526000,"dataChange":true}}
{"add":{"path":"part-00003-8cae5863-12f2-476e-9c1b-e29720a39b66-c000.snappy.parquet","partitionValues":{},"size":431,"modificationTime":1556253526000,"dataChange":true}}

2.3 Update the table data

为了方便展示,下图可以看到,用于 update 的数据是不一样的,见红色方框。

此时,表存储路径下的文件有了一些变化。

.
├── _delta_log
│   ├── 00000000000000000000.json
│   └── 00000000000000000001.json
├── part-00000-a2ff9308-0780-47bd-b666-8f5047802137-c000.snappy.parquet
├── part-00000-e96dc56e-2053-401a-8c18-fe34ccaf987b-c000.snappy.parquet
├── part-00001-c92a2add-2922-4e27-af17-a199951244fc-c000.snappy.parquet
├── part-00001-f6ace3ea-b44c-47b1-9a90-79373f6b0dd1-c000.snappy.parquet
├── part-00002-0d57c68e-4c1b-43d0-8ecf-9491c7ccc140-c000.snappy.parquet
├── part-00002-368241a5-f83a-4741-88be-b5ccfedc6363-c000.snappy.parquet
├── part-00003-15211aeb-9eae-4771-8d76-dbd8cf138546-c000.snappy.parquet
└── part-00003-8b38c077-bd5c-48f2-b342-c1eb9e8ccd03-c000.snappy.parquet

再看一下 transaction log 的变化。涉及到了

{"commitInfo":{"timestamp":1556326035295,"operation":"WRITE","operationParameters":{"mode":"Overwrite","partitionBy":"[]"},"readVersion":0}}
{"add":{"path":"part-00000-a2ff9308-0780-47bd-b666-8f5047802137-c000.snappy.parquet","partitionValues":{},"size":423,"modificationTime":1556326033000,"dataChange":true}}
{"add":{"path":"part-00001-c92a2add-2922-4e27-af17-a199951244fc-c000.snappy.parquet","partitionValues":{},"size":423,"modificationTime":1556326033000,"dataChange":true}}
{"add":{"path":"part-00002-368241a5-f83a-4741-88be-b5ccfedc6363-c000.snappy.parquet","partitionValues":{},"size":423,"modificationTime":1556326033000,"dataChange":true}}
{"add":{"path":"part-00003-15211aeb-9eae-4771-8d76-dbd8cf138546-c000.snappy.parquet","partitionValues":{},"size":431,"modificationTime":1556326033000,"dataChange":true}}
{"remove":{"path":"part-00003-8b38c077-bd5c-48f2-b342-c1eb9e8ccd03-c000.snappy.parquet","deletionTimestamp":1556326035294,"dataChange":true}}
{"remove":{"path":"part-00000-e96dc56e-2053-401a-8c18-fe34ccaf987b-c000.snappy.parquet","deletionTimestamp":1556326035295,"dataChange":true}}
{"remove":{"path":"part-00002-0d57c68e-4c1b-43d0-8ecf-9491c7ccc140-c000.snappy.parquet","deletionTimestamp":1556326035295,"dataChange":true}}
{"remove":{"path":"part-00001-f6ace3ea-b44c-47b1-9a90-79373f6b0dd1-c000.snappy.parquet","deletionTimestamp":1556326035295,"dataChange":true}}

这里特别提醒一下,可以看到事务日志有两个操作,一个是 add,一个是 remove。特别留意一下时间戳,update 文件都是先 add 再 remove 的,所以 add 的时间1556326033000是更早于 remove 的时间1556326035294。

{"add":{"path":"part-00003-15211aeb-9eae-4771-8d76-dbd8cf138546-c000.snappy.parquet","partitionValues":{},"size":431,"modificationTime":1556326033000,"dataChange":true}}
{"remove":{"path":"part-00003-8b38c077-bd5c-48f2-b342-c1eb9e8ccd03-c000.snappy.parquet","deletionTimestamp":1556326035294,"dataChange":true}}

2.4 Read data

默认读取的,都是最新的文件。

scala> val df = spark.read.format("delta").load("/tmp/delta-table")
df: org.apache.spark.sql.DataFrame = [id: bigint]

scala> df.show()
+---+
| id|
+---+
|  8|
|  9|
|  5|
|  7|
|  6|
+---+

2.5 Read older versions of data using Time Travel

Delta 为读取旧版本的数据,定义了个名字,叫做 Time travel,后面源码分析的时候会经常提及吧。Delta 提供了一个 option 来描述。

scala> val df = spark.read.format("delta").option("versionAsOf", 0).load("/tmp/delta-table")
df: org.apache.spark.sql.DataFrame = [id: bigint]

scala> df.show()
+---+
| id|
+---+
|  3|
|  4|
|  0|
|  2|
|  1|
+---+

2.6 Write a stream of data to a table

通过 Structure Streaming 也可以写流式的表,可以提供 Exactly-one。流式表是采用 append 方式来追加数据到原表里的。

➜  _delta_log ll
total 160
-rw-r--r--  1 runzhliu  wheel   1.1K  4 27 08:47 00000000000000000000.json
-rw-r--r--  1 runzhliu  wheel   1.4K  4 27 08:47 00000000000000000001.json
-rw-r--r--  1 runzhliu  wheel   466B  4 27 09:21 00000000000000000002.json
-rw-r--r--  1 runzhliu  wheel   976B  4 27 09:21 00000000000000000003.json
-rw-r--r--  1 runzhliu  wheel   976B  4 27 09:21 00000000000000000004.json
-rw-r--r--  1 runzhliu  wheel   976B  4 27 09:21 00000000000000000005.json
-rw-r--r--  1 runzhliu  wheel   976B  4 27 09:22 00000000000000000006.json
-rw-r--r--  1 runzhliu  wheel   976B  4 27 09:22 00000000000000000007.json
-rw-r--r--  1 runzhliu  wheel   976B  4 27 09:22 00000000000000000008.json
-rw-r--r--  1 runzhliu  wheel   976B  4 27 09:22 00000000000000000009.json
-rw-r--r--  1 runzhliu  wheel    14K  4 27 09:22 00000000000000000010.checkpoint.parquet
-rw-r--r--  1 runzhliu  wheel   976B  4 27 09:22 00000000000000000010.json
-rw-r--r--  1 runzhliu  wheel   977B  4 27 09:22 00000000000000000011.json
-rw-r--r--  1 runzhliu  wheel   809B  4 27 09:22 00000000000000000012.json
-rw-r--r--  1 runzhliu  wheel   809B  4 27 09:22 00000000000000000013.json
-rw-r--r--  1 runzhliu  wheel   979B  4 27 09:23 00000000000000000014.json
-rw-r--r--  1 runzhliu  wheel    25B  4 27 09:22 _last_checkpoint
➜  _delta_log ll
total 168
-rw-r--r--  1 runzhliu  wheel   1.1K  4 27 08:47 00000000000000000000.json
-rw-r--r--  1 runzhliu  wheel   1.4K  4 27 08:47 00000000000000000001.json
-rw-r--r--  1 runzhliu  wheel   466B  4 27 09:21 00000000000000000002.json
-rw-r--r--  1 runzhliu  wheel   976B  4 27 09:21 00000000000000000003.json
-rw-r--r--  1 runzhliu  wheel   976B  4 27 09:21 00000000000000000004.json
-rw-r--r--  1 runzhliu  wheel   976B  4 27 09:21 00000000000000000005.json
-rw-r--r--  1 runzhliu  wheel   976B  4 27 09:22 00000000000000000006.json
-rw-r--r--  1 runzhliu  wheel   976B  4 27 09:22 00000000000000000007.json
-rw-r--r--  1 runzhliu  wheel   976B  4 27 09:22 00000000000000000008.json
-rw-r--r--  1 runzhliu  wheel   976B  4 27 09:22 00000000000000000009.json
-rw-r--r--  1 runzhliu  wheel    14K  4 27 09:22 00000000000000000010.checkpoint.parquet
-rw-r--r--  1 runzhliu  wheel   976B  4 27 09:22 00000000000000000010.json
-rw-r--r--  1 runzhliu  wheel   977B  4 27 09:22 00000000000000000011.json
-rw-r--r--  1 runzhliu  wheel   809B  4 27 09:22 00000000000000000012.json
-rw-r--r--  1 runzhliu  wheel   809B  4 27 09:22 00000000000000000013.json
-rw-r--r--  1 runzhliu  wheel   979B  4 27 09:23 00000000000000000014.json
-rw-r--r--  1 runzhliu  wheel   979B  4 27 09:23 00000000000000000015.json
-rw-r--r--  1 runzhliu  wheel    25B  4 27 09:22 _last_checkpoint

可以看到流式表的事务日志是不断的在增加的。需要注意的是,流表在写的时候,是不影响读的,后面源码分析的时候,也会针对这个进行探索。

2.7 Read a stream of changes from a table

流式表可以边写边读,这里就不贴接结果了。

3 Summary

以上就是 Delta Lake 官网的 Qucik Start 的内容,建议大家可以按照以上内容来快速测试一下。