The Linux Foundation Projects
Delta Lake

从 Parquet 转换为 Delta Lake

作者:Matthew Powers

Delta Lake 相较于普通 Parquet 表有诸多优势,例如支持 ACID 事务、时间旅行和并发控制,以及用于提升查询性能的优化。您可以轻松利用这些特性,将 Parquet 表转换为 Delta Lake。代码简单,Parquet 文件无需重写,因此所需的计算资源比您想象的要少。

本文将向您展示如何将 Parquet 表转换为 Delta Lake,以便您可以享受 Delta Lake 提供的所有优势。转换是原地操作,因此既快速又经济。

Parquet 到 Delta Lake API

Delta Lake 提供了一个 API,DeltaTable.convertToDelta,用于将 Parquet 表转换为 Delta Lake。例如,我们可以使用以下代码,通过 PySpark 将未分区的 Parquet 表转换为 Delta Lake

from delta.tables import *

deltaTable = DeltaTable.convertToDelta(spark, "parquet.`<path-to-table>`")

让我们创建一个 Parquet 数据集,并在真实文件集上运行此命令。我们将首先创建一个包含三行数据的 Parquet 表

columns = ["language", "num_speakers"]
data = [("English", "1.5"), ("Mandarin", "1.1"), ("Hindi", "0.6")]
rdd = spark.sparkContext.parallelize(data)
df = rdd.toDF(columns)

df.write.format("parquet").save("tmp/lake1")

这是创建的文件

tmp/lake1
├── _SUCCESS
├── part-00000-b84573b6-b805-4162-9143-9c598b80c289-c000.snappy.parquet
├── part-00003-b84573b6-b805-4162-9143-9c598b80c289-c000.snappy.parquet
├── part-00006-b84573b6-b805-4162-9143-9c598b80c289-c000.snappy.parquet
└── part-00009-b84573b6-b805-4162-9143-9c598b80c289-c000.snappy.parquet

现在,让我们运行代码将 Parquet 表转换为 Delta Lake

deltaTable = DeltaTable.convertToDelta(spark, "parquet.`tmp/lake1`")

这是 Delta Lake 的内容

tmp/lake1
├── _SUCCESS
├── _delta_log
│   ├── 00000000000000000000.checkpoint.parquet
│   ├── 00000000000000000000.json
│   └── _last_checkpoint
├── part-00000-b84573b6-b805-4162-9143-9c598b80c289-c000.snappy.parquet
├── part-00003-b84573b6-b805-4162-9143-9c598b80c289-c000.snappy.parquet
├── part-00006-b84573b6-b805-4162-9143-9c598b80c289-c000.snappy.parquet
└── part-00009-b84573b6-b805-4162-9143-9c598b80c289-c000.snappy.parquet

该命令扫描所有 Parquet 文件并构建 `_delta_log` 目录,其中包含 Delta Lake 数据查询所需的元数据。请注意,Delta Lake 中的所有 Parquet 文件与 Parquet 表中的 Parquet 文件相同。尽管增加了元数据,但从 Parquet 转换为 Delta Lake 只会导致存储成本略微增加,因为没有数据被重写:`convertToDelta` 是一个原地操作。您可以访问的附加功能绝对值得这微小的成本增加。

在这个示例中,操作很快,因为只有两个 Parquet 文件。当然,当文件更多时,构建 `_delta_log` 需要更多时间。

将分区 Parquet 表转换为 Delta Lake

现在让我们看看将分区 Parquet 表转换为 Delta Lake 的过程。我们将从创建表开始

df.write.partitionBy("language").format("parquet").save("tmp/lake2")

这是磁盘上的文件

tmp/lake2
├── _SUCCESS
├── language=English
│   └── part-00003-fa662100-1eff-4609-a0dd-794b5eec991a.c000.snappy.parquet
├── language=Hindi
│   └── part-00009-fa662100-1eff-4609-a0dd-794b5eec991a.c000.snappy.parquet
└── language=Mandarin
    └── part-00006-fa662100-1eff-4609-a0dd-794b5eec991a.c000.snappy.parquet

现在让我们尝试将此 Parquet 表转换为 Delta Lake

deltaTable = DeltaTable.convertToDelta(spark, "parquet.`tmp/lake2`")

但是,有一个问题——此代码会报错,显示以下消息

AnalysisException: Expecting 0 partition column(s): [], but found 1 partition column(s): [`language`] from parsing the file name: file:/.../delta-examples/notebooks/pyspark/tmp/lake2/language=English/part-00003-fa662100-1eff-4609-a0dd-794b5eec991a.c000.snappy.parquet

在 Parquet 表中,分区列的数据类型由目录名称决定,这可能含糊不清。例如,当它读取目录名称 date=2022-09-21 时,Delta Lake 无法知道日期分区列的哪种数据类型是所需的——应该是字符串、日期还是时间戳?因此,您需要提供第三个参数:一个 Hive DDL 格式的字符串,指定分区列的名称和数据类型。例如

deltaTable = DeltaTable.convertToDelta(spark, "parquet.`tmp/lake2`", "language STRING")

转换后磁盘上的文件

tmp/lake2
├── _SUCCESS
├── _delta_log
│   ├── 00000000000000000000.checkpoint.parquet
│   ├── 00000000000000000000.json
│   └── _last_checkpoint
├── language=English
│   └── part-00003-fa662100-1eff-4609-a0dd-794b5eec991a.c000.snappy.parquet
├── language=Hindi
│   └── part-00009-fa662100-1eff-4609-a0dd-794b5eec991a.c000.snappy.parquet
└── language=Mandarin
    └── part-00006-fa662100-1eff-4609-a0dd-794b5eec991a.c000.snappy.parquet

这个过程同样简单,但当您将磁盘分区的 Parquet 表转换为 Delta Lake 时,需要记住这个小小的额外要求。如果您不这样做,Delta Lake 会提醒您!

转换为 Delta Lakes 的优缺点

Delta Lake 相对于普通的 Parquet 表有诸多优势:它允许在不同版本的数据之间进行时间旅行、ACID 事务、并发安全性以及各种其他好处。转换为 Delta Lake 既快速又简单,几乎没有缺点。

需要注意的一点是,当存在大量 Parquet 文件时,从 Parquet 表转换为 Delta Lake 的计算成本可能会很高。这是因为转换过程需要打开所有文件并计算元数据统计信息以构建 `_delta_log`。

此外,一旦 Parquet 表转换为 Delta Lake,它只能由具有 Delta Lake 读取器的查询引擎读取。Delta Lake 拥有快速增长的连接器生态系统,因此已经支持与大多数查询引擎的互操作性;但是,一些查询引擎支持读取普通的 Parquet 表,但尚不支持读取 Delta Lake。因此,Parquet 可能与某些传统技术更好地集成,但这些情况正变得越来越少见。

下一步

从 Parquet 转换为 Delta Lake 很容易,这不足为奇,因为它们都是开放技术。Parquet 是一种开源文件格式,而 Delta Lake 是一种开源文件协议,它将数据存储在 Parquet 文件中。

您在本博文中看到的所有代码片段都是完全开源的,您可以轻松地在本地机器上运行它们。从 Parquet 转换为 Delta Lake 不需要以专有文件格式重写数据,这将大大增加计算成本。

如前所述,Delta Lake 庞大的连接器生态系统意味着您可能不需要经常在 Parquet 和 Delta Lake 之间进行转换。此操作主要在您的组织决定从 Parquet 切换到 Delta Lake 以利用 Delta Lake 免费提供的所有额外功能时派上用场:运行一次,您的组织就可以无缝享受新格式带来的好处!

LinkedIn 上关注我们的作者