如何使用 pandas 创建和追加 Delta Lake 表
这篇博客文章解释了如何使用 pandas 创建和追加 Delta Lake 表。
您可以轻松地使用 pandas 创建 Delta Lake 表,并且无需依赖 Spark。
您还可以使用 pandas 追加到 Delta 表、覆盖 Delta 表以及覆盖特定的 Delta 表分区。Delta 事务的实现方式与 pandas 对 CSV 或 Parquet 等其他文件类型的操作不同。正常的 pandas 事务会不可逆地更改数据,而 Delta 事务则易于撤消。
让我们深入了解!如果您想在本地计算机上运行代码,请参阅此笔记本。
使用 pandas 创建 Delta Lake 表
让我们从创建 pandas DataFrame 开始。
import pandas as pd
df = pd.DataFrame({"x": [1, 2, 3]})
现在,我们将 pandas DataFrame 写入 Delta 表。
import os
from deltalake import DeltaTable
from deltalake.writer import write_deltalake
write_deltalake("tmp/some_delta_lake", df)
让我们将 Delta 表读回 pandas DataFrame,以确保它已正确写入
DeltaTable("tmp/some_delta_lake").to_pandas()
x
0 1
1 2
2 3
将 pandas DataFrame 写入 Delta 表并将 Delta 表读入 pandas DataFrame 非常容易。现在让我们看看如何向现有 Delta 表追加更多数据。
使用 pandas 追加到 Delta Lake 表
创建另一个 pandas DataFrame,它将被追加到 Delta 表中。
df2 = pd.DataFrame({"x": [8, 9, 10]})
现在将 pandas DataFrame 追加到 Delta 表。
write_deltalake("tmp/some_delta_lake", df2, mode="append")
将 Delta 表读入 pandas DataFrame,以确保 Delta 表已正确读取。
DeltaTable("tmp/some_delta_lake").to_pandas()
x
0 1
1 2
2 3
3 9
4 8
5 10
追加操作会向现有 Delta 表添加数据。现在让我们看看如何覆盖 Delta 表,这将删除所有现有数据并将其替换为新内容。
使用 pandas 覆盖 Delta Lake 表
创建另一个 pandas DataFrame,它将用于覆盖 Delta 表。
df3 = pd.DataFrame({"x": [55, 66, 77]})
执行覆盖事务。
write_deltalake("tmp/some_delta_lake", df3, mode="overwrite")
确认 Delta 表已被覆盖。
DeltaTable("tmp/some_delta_lake").to_pandas()
x
0 55
1 66
2 77
覆盖 Delta 表是一个逻辑操作。它实际上不会从磁盘中删除旧数据文件。请参阅Denny 的博客文章以获得对逻辑事务与物理事务的更多直觉。
由于数据文件未从磁盘中物理删除,您可以像这篇博客文章中所述,在不同版本的数据之间进行时间旅行。
使用 pandas 覆盖 Delta Lake 表分区
从 deltalake 0.8.1 版本开始,您现在可以使用谓词覆盖 Delta 表的分区。
创建一个带有 name
和 country
列的 pandas DataFrame,可以用于创建分区 Delta 表。
df = pd.DataFrame(
{"name": ["li", "xi", "sally", "fred"], "country": ["china", "china", "us", "us"]}
)
现在将 DataFrame 作为分区 Delta 表写入
write_deltalake(
"tmp/some_people",
df,
partition_by=["country"],
)
这是 Delta 表的内容。您可以看到它正在使用 Hive 样式分区。
tmp/some_people
├── _delta_log
│ └── 00000000000000000000.json
├── country=china
│ └── 0-dd1deda9-b862-47fb-8ffd-4c91d410ad31-0.parquet
└── country=us
└── 0-dd1deda9-b862-47fb-8ffd-4c91d410ad31-0.parquet
将 Delta 表读入 pandas DataFrame 时查看其内容
DeltaTable("tmp/some_people").to_pandas()
name country
0 li china
1 xi china
2 sally us
3 fred us
创建另一个包含三名来自中国的人的 DataFrame。
df = pd.DataFrame(
{"name": ["jack", "bruce", "yao"], "country": ["china", "china", "china"]}
)
用新的 DataFrame 覆盖中国分区。您应该只覆盖中国分区,并保持其他分区不变。
write_deltalake(
"tmp/some_people",
df,
mode="overwrite",
partition_filters=[("country", "=", "china")],
)
读取 Delta 表以确认 Delta 表内容符合预期。
DeltaTable("tmp/some_people").to_pandas()
name country
0 sally us
1 fred us
2 jack china
3 bruce china
4 yao china
您仍然可以时间旅行回到 Delta 表的初始版本
DeltaTable("tmp/some_people", version=0).to_pandas()
name country
0 li china
1 xi china
2 sally us
3 fred us
您可以查看存储中的文件,并看到一个文件已添加到中国分区中
tmp/some_people
├── _delta_log
│ ├── 00000000000000000000.json
│ └── 00000000000000000001.json
├── country=china
│ ├── 0-dd1deda9-b862-47fb-8ffd-4c91d410ad31-0.parquet
│ └── 1-45cf731b-382f-4244-b156-d1f009f02a80-0.parquet
└── country=us
└── 0-dd1deda9-b862-47fb-8ffd-4c91d410ad31-0.parquet
Delta Lake 覆盖操作是逻辑操作,而不是物理操作。Delta Lake 通过元数据操作逻辑上删除文件。它不会通过从存储中删除文件来物理删除文件。
让我们查看与覆盖事务相关的事务日志条目以获得更好的理解。
{
"add": {
"path": "country=china/1-41f18aa2-9707-4716-b5ae-4089cf778756-0.parquet",
"size": 1859,
"partitionValues": {
"country": "china"
},
"modificationTime": 1679455801261,
"dataChange": true,
"stats": "{\"numRecords\": 3, \"minValues\": {\"name\": \"bruce\"}, \"maxValues\": {\"name\": \"yao\"}, \"nullCount\": {\"name\": 0}}",
"tags": null
}
}
{
"remove": {
"path": "country=china/0-7220ecd3-1497-485d-9b85-583cf4fd6be7-0.parquet",
"deletionTimestamp": 1679455801261,
"dataChange": true,
"extendedFileMetadata": false,
"partitionValues": {
"country": "china"
},
"size": 1834,
"tags": null
}
}
{
"commitInfo": {
"timestamp": 1679455801262,
"operation": "WRITE",
"operationParameters": {
"partitionBy": "[\"country\"]",
"mode": "Overwrite"
},
"clientVersion": "delta-rs.0.8.0"
}
}
使用 pandas 覆盖 Delta 表中的分区非常容易!
使用 pandas 创建分区 Parquet 表
让我们像以前一样创建相同的 DataFrame,但将其写入使用 pandas 分区的 Parquet DataFrame。
重新创建原始 pandas DataFrame
df = pd.DataFrame(
{"name": ["li", "xi", "sally", "fred"], "country": ["china", "china", "us", "us"]}
)
将 DataFrame 写入分区 Parquet 数据集
df.to_parquet("tmp/some_people_parquet", partition_cols=["country"])
检查存储中的文件
tmp/some_people_parquet
├── country=china
│ └── de44a20d63a8443ba94883fc956a239d-0.parquet
└── country=us
└── de44a20d63a8443ba94883fc956a239d-0.parquet
查看其中一个 Parquet 文件的内容
pd.read_parquet(
"tmp/some_people_parquet/country=china/de44a20d63a8443ba94883fc956a239d-0.parquet"
)
name
0 li
1 xi
Parquet 文件遵循 Hive 样式分区,并且不包含 country
列。country
列中的数据已抽象到目录结构中。
现在让我们看看如何使用 pandas 覆盖分区 Parquet 表的分区。
使用 pandas 覆盖分区 Parquet 表的分区
我们需要做一些修改才能使用 pandas 覆盖 Parquet 表分区。
首先创建与之前相同的 DataFrame,它将进行覆盖
df = pd.DataFrame(
{"name": ["jack", "bruce", "yao"], "country": ["china", "china", "china"]}
)
在追加数据之前,我们需要删除 country
列以遵循 Hive 样式分区约定。
df2 = df.drop(columns=["country"])
将新数据追加到分区 Parquet 表中。您需要手动指定正确的文件夹。
df2.to_parquet("tmp/some_people_parquet/country=china/new-data.parquet")
查看存储中 Parquet 表的内容
tmp/some_people_parquet
├── country=china
│ ├── de44a20d63a8443ba94883fc956a239d-0.parquet
│ └── new-data.parquet
└── country=us
└── de44a20d63a8443ba94883fc956a239d-0.parquet
这是 Parquet 表的内容
name country
0 li china
1 xi china
2 jack china
3 bruce china
4 yao china
5 sally us
6 fred us
我们没有覆盖分区,我们只是追加到分区。让我们手动删除旧数据文件
rm tmp/some_people_parquet/country=china/de44a20d63a8443ba94883fc956a239d-0.parquet
我们的 Parquet 表现在包含正确的数据
pd.read_parquet("tmp/some_people_parquet")
name country
0 jack china
1 bruce china
2 yao china
3 sally us
4 fred us
覆盖 Delta 表分区比 pandas 为 Parquet 表提供的功能要好得多。
Delta Lake 在这里与众不同的是,它会验证您传递的数据是否与您要覆盖的分区匹配,如果出现错误,它将报错。它也不要求用户执行手动和危险的文件删除操作。
为什么 Delta Lake 对于许多 pandas 分析优于 Parquet
与 Parquet 相比,Delta Lake 在 pandas 分析方面具有许多优势
- 模式强制
- 模式演变
- 读取数据时跳过文件
- ACID 事务
- DML 事务
- 版本化数据
- 还有更多…
对于大多数分析,Delta Lake 提供比普通 Parquet 更好的用户体验。当然,Parquet 比 CSV 更好,原因在此视频中解释。
结论
有几种方法可以使用 pandas 创建和追加数据到 Delta 表。
您可以追加到现有的 Delta 表,完全覆盖 Delta 表,或覆盖 Delta 表中的特定分区。
Delta Lake 追加和覆盖事务是逻辑操作,因此您仍然可以时间旅行到数据的早期版本或回滚以撤消错误。
Delta Lake 事务易于撤消,并且它们不会删除您的旧数据文件,因此您始终可以时间旅行回到数据的早期版本。这些功能是 pandas 用户生活质量的关键改进。不再因常规 pandas 覆盖而意外删除数据,这可能会导致数据丢失!