The Linux Foundation Projects
Delta Lake

如何使用 pandas 创建和追加 Delta Lake 表

作者:Matthew Powers

这篇博客文章解释了如何使用 pandas 创建和追加 Delta Lake 表。

您可以轻松地使用 pandas 创建 Delta Lake 表,并且无需依赖 Spark。

您还可以使用 pandas 追加到 Delta 表、覆盖 Delta 表以及覆盖特定的 Delta 表分区。Delta 事务的实现方式与 pandas 对 CSV 或 Parquet 等其他文件类型的操作不同。正常的 pandas 事务会不可逆地更改数据,而 Delta 事务则易于撤消。

让我们深入了解!如果您想在本地计算机上运行代码,请参阅此笔记本

使用 pandas 创建 Delta Lake 表

让我们从创建 pandas DataFrame 开始。

import pandas as pd

df = pd.DataFrame({"x": [1, 2, 3]})

现在,我们将 pandas DataFrame 写入 Delta 表。

import os
from deltalake import DeltaTable
from deltalake.writer import write_deltalake

write_deltalake("tmp/some_delta_lake", df)

让我们将 Delta 表读回 pandas DataFrame,以确保它已正确写入

DeltaTable("tmp/some_delta_lake").to_pandas()

	x
0	1
1	2
2	3

将 pandas DataFrame 写入 Delta 表并将 Delta 表读入 pandas DataFrame 非常容易。现在让我们看看如何向现有 Delta 表追加更多数据。

使用 pandas 追加到 Delta Lake 表

创建另一个 pandas DataFrame,它将被追加到 Delta 表中。

df2 = pd.DataFrame({"x": [8, 9, 10]})

现在将 pandas DataFrame 追加到 Delta 表。

write_deltalake("tmp/some_delta_lake", df2, mode="append")

将 Delta 表读入 pandas DataFrame,以确保 Delta 表已正确读取。

DeltaTable("tmp/some_delta_lake").to_pandas()

	x
0	1
1	2
2	3
3	9
4	8
5	10

追加操作会向现有 Delta 表添加数据。现在让我们看看如何覆盖 Delta 表,这将删除所有现有数据并将其替换为新内容。

使用 pandas 覆盖 Delta Lake 表

创建另一个 pandas DataFrame,它将用于覆盖 Delta 表。

df3 = pd.DataFrame({"x": [55, 66, 77]})

执行覆盖事务。

write_deltalake("tmp/some_delta_lake", df3, mode="overwrite")

确认 Delta 表已被覆盖。

DeltaTable("tmp/some_delta_lake").to_pandas()

	x
0	55
1	66
2	77

覆盖 Delta 表是一个逻辑操作。它实际上不会从磁盘中删除旧数据文件。请参阅Denny 的博客文章以获得对逻辑事务与物理事务的更多直觉。

由于数据文件未从磁盘中物理删除,您可以像这篇博客文章中所述,在不同版本的数据之间进行时间旅行。

使用 pandas 覆盖 Delta Lake 表分区

deltalake 0.8.1 版本开始,您现在可以使用谓词覆盖 Delta 表的分区。

创建一个带有 namecountry 列的 pandas DataFrame,可以用于创建分区 Delta 表。

df = pd.DataFrame(
    {"name": ["li", "xi", "sally", "fred"], "country": ["china", "china", "us", "us"]}
)

现在将 DataFrame 作为分区 Delta 表写入

write_deltalake(
    "tmp/some_people",
    df,
    partition_by=["country"],
)

这是 Delta 表的内容。您可以看到它正在使用 Hive 样式分区。

tmp/some_people
├── _delta_log
│   └── 00000000000000000000.json
├── country=china
│   └── 0-dd1deda9-b862-47fb-8ffd-4c91d410ad31-0.parquet
└── country=us
    └── 0-dd1deda9-b862-47fb-8ffd-4c91d410ad31-0.parquet

将 Delta 表读入 pandas DataFrame 时查看其内容

DeltaTable("tmp/some_people").to_pandas()

	name	country
0	li	china
1	xi	china
2	sally	us
3	fred	us

创建另一个包含三名来自中国的人的 DataFrame。

df = pd.DataFrame(
    {"name": ["jack", "bruce", "yao"], "country": ["china", "china", "china"]}
)

用新的 DataFrame 覆盖中国分区。您应该只覆盖中国分区,并保持其他分区不变。

write_deltalake(
    "tmp/some_people",
    df,
    mode="overwrite",
    partition_filters=[("country", "=", "china")],
)

读取 Delta 表以确认 Delta 表内容符合预期。

DeltaTable("tmp/some_people").to_pandas()

	name	country
0	sally	us
1	fred	us
2	jack	china
3	bruce	china
4	yao	china

您仍然可以时间旅行回到 Delta 表的初始版本

DeltaTable("tmp/some_people", version=0).to_pandas()

	name	country
0	li	china
1	xi	china
2	sally	us
3	fred	us

您可以查看存储中的文件,并看到一个文件已添加到中国分区中

tmp/some_people
├── _delta_log
│   ├── 00000000000000000000.json
│   └── 00000000000000000001.json
├── country=china
│   ├── 0-dd1deda9-b862-47fb-8ffd-4c91d410ad31-0.parquet
│   └── 1-45cf731b-382f-4244-b156-d1f009f02a80-0.parquet
└── country=us
    └── 0-dd1deda9-b862-47fb-8ffd-4c91d410ad31-0.parquet

Delta Lake 覆盖操作是逻辑操作,而不是物理操作。Delta Lake 通过元数据操作逻辑上删除文件。它不会通过从存储中删除文件来物理删除文件。

让我们查看与覆盖事务相关的事务日志条目以获得更好的理解。

{
  "add": {
    "path": "country=china/1-41f18aa2-9707-4716-b5ae-4089cf778756-0.parquet",
    "size": 1859,
    "partitionValues": {
      "country": "china"
    },
    "modificationTime": 1679455801261,
    "dataChange": true,
    "stats": "{\"numRecords\": 3, \"minValues\": {\"name\": \"bruce\"}, \"maxValues\": {\"name\": \"yao\"}, \"nullCount\": {\"name\": 0}}",
    "tags": null
  }
}
{
  "remove": {
    "path": "country=china/0-7220ecd3-1497-485d-9b85-583cf4fd6be7-0.parquet",
    "deletionTimestamp": 1679455801261,
    "dataChange": true,
    "extendedFileMetadata": false,
    "partitionValues": {
      "country": "china"
    },
    "size": 1834,
    "tags": null
  }
}
{
  "commitInfo": {
    "timestamp": 1679455801262,
    "operation": "WRITE",
    "operationParameters": {
      "partitionBy": "[\"country\"]",
      "mode": "Overwrite"
    },
    "clientVersion": "delta-rs.0.8.0"
  }
}

使用 pandas 覆盖 Delta 表中的分区非常容易!

使用 pandas 创建分区 Parquet 表

让我们像以前一样创建相同的 DataFrame,但将其写入使用 pandas 分区的 Parquet DataFrame。

重新创建原始 pandas DataFrame

df = pd.DataFrame(
    {"name": ["li", "xi", "sally", "fred"], "country": ["china", "china", "us", "us"]}
)

将 DataFrame 写入分区 Parquet 数据集

df.to_parquet("tmp/some_people_parquet", partition_cols=["country"])

检查存储中的文件

tmp/some_people_parquet
├── country=china
│   └── de44a20d63a8443ba94883fc956a239d-0.parquet
└── country=us
    └── de44a20d63a8443ba94883fc956a239d-0.parquet

查看其中一个 Parquet 文件的内容

pd.read_parquet(
    "tmp/some_people_parquet/country=china/de44a20d63a8443ba94883fc956a239d-0.parquet"
)

	name
0	li
1	xi

Parquet 文件遵循 Hive 样式分区,并且不包含 country 列。country 列中的数据已抽象到目录结构中。

现在让我们看看如何使用 pandas 覆盖分区 Parquet 表的分区。

使用 pandas 覆盖分区 Parquet 表的分区

我们需要做一些修改才能使用 pandas 覆盖 Parquet 表分区。

首先创建与之前相同的 DataFrame,它将进行覆盖

df = pd.DataFrame(
    {"name": ["jack", "bruce", "yao"], "country": ["china", "china", "china"]}
)

在追加数据之前,我们需要删除 country 列以遵循 Hive 样式分区约定。

df2 = df.drop(columns=["country"])

将新数据追加到分区 Parquet 表中。您需要手动指定正确的文件夹。

df2.to_parquet("tmp/some_people_parquet/country=china/new-data.parquet")

查看存储中 Parquet 表的内容

tmp/some_people_parquet
├── country=china
│   ├── de44a20d63a8443ba94883fc956a239d-0.parquet
│   └── new-data.parquet
└── country=us
    └── de44a20d63a8443ba94883fc956a239d-0.parquet

这是 Parquet 表的内容

	name	country
0	li	china
1	xi	china
2	jack	china
3	bruce	china
4	yao	china
5	sally	us
6	fred	us

我们没有覆盖分区,我们只是追加到分区。让我们手动删除旧数据文件

rm tmp/some_people_parquet/country=china/de44a20d63a8443ba94883fc956a239d-0.parquet

我们的 Parquet 表现在包含正确的数据

pd.read_parquet("tmp/some_people_parquet")

	name	country
0	jack	china
1	bruce	china
2	yao	china
3	sally	us
4	fred	us

覆盖 Delta 表分区比 pandas 为 Parquet 表提供的功能要好得多。

Delta Lake 在这里与众不同的是,它会验证您传递的数据是否与您要覆盖的分区匹配,如果出现错误,它将报错。它也不要求用户执行手动和危险的文件删除操作。

为什么 Delta Lake 对于许多 pandas 分析优于 Parquet

与 Parquet 相比,Delta Lake 在 pandas 分析方面具有许多优势

  • 模式强制
  • 模式演变
  • 读取数据时跳过文件
  • ACID 事务
  • DML 事务
  • 版本化数据
  • 还有更多…

对于大多数分析,Delta Lake 提供比普通 Parquet 更好的用户体验。当然,Parquet 比 CSV 更好,原因在此视频中解释

结论

有几种方法可以使用 pandas 创建和追加数据到 Delta 表。

您可以追加到现有的 Delta 表,完全覆盖 Delta 表,或覆盖 Delta 表中的特定分区。

Delta Lake 追加和覆盖事务是逻辑操作,因此您仍然可以时间旅行到数据的早期版本或回滚以撤消错误。

Delta Lake 事务易于撤消,并且它们不会删除您的旧数据文件,因此您始终可以时间旅行回到数据的早期版本。这些功能是 pandas 用户生活质量的关键改进。不再因常规 pandas 覆盖而意外删除数据,这可能会导致数据丢失!

LinkedIn 上关注我们的作者