The Linux Foundation Projects
Delta Lake

如何使用 Delta Lake 生成的列

作者:Matthew Powers

这篇博文介绍了如何使用生成列创建 Delta Lake 表,这是一种根据其他列值自动填充列的绝佳方法。

您将了解在数据工作流程中何时可以使用生成列。

本文中的所有计算都在这个笔记本中,以防您想跟着在本地计算机上运行代码。

创建带有生成列的 Delta Lake 表

在本节中,我们将创建一个 Delta Lake 表,其中包含idfirst_namelast_nameagefull_name列。

当用户向表中追加数据时,将提供idfirst_namelast_nameage列。

当数据追加到表中时,full_name列将由 Delta Lake 生成。full_name列将简单地连接first_namelast_name列,并用空格分隔它们。

以下是如何使用 Python 创建空 Delta 表

from delta import DeltaTable

(
    DeltaTable.create(spark)
    .tableName("default.some_people")
    .addColumn("id", "LONG")
    .addColumn("first_name", "STRING")
    .addColumn("last_name", "STRING")
    .addColumn("age", "LONG")
    .addColumn(
        "full_name", "STRING", generatedAlwaysAs="concat(first_name, ' ', last_name)"
    )
    .execute()
)

让我们显示 Delta 表并确保它是空的

spark.sql("select * from some_people").show()

+---+----------+---------+---+---------+
| id|first_name|last_name|age|full_name|
+---+----------+---------+---+---------+
+---+----------+---------+---+---------+

很好,现在让我们看看如何向此表追加数据并利用生成列功能。

插入到带有生成列的 Delta Lake 表中

让我们向 Delta Lake 表追加数据。我们将追加一个包含idfirst_namelast_nameage列的 DataFrame。这将让我们观察 Delta Lake 如何自动生成full_name列。

df = spark.createDataFrame(
    [(0, "Bob", "Loblaw", 23),(1, "Sue", "Grafton", None), (2, "Jim", "Carrey", 61)]
).toDF("id", "first_name", "last_name", "age")

df.write.mode("append").format("delta").saveAsTable("some_people")

现在查询表并验证full_name列是否已由 Delta Lake 填充

DeltaTable.forName(spark, "some_people").toDF().show()

+---+----------+---------+----+-----------+
| id|first_name|last_name| age|  full_name|
+---+----------+---------+----+-----------+
|  2|       Jim|   Carrey|  61| Jim Carrey|
|  0|       Bob|   Loblaw|  23| Bob Loblaw|
|  1|       Sue|  Grafton|null|Sue Grafton|
+---+----------+---------+----+-----------+

让我们看看 Delta Lake 如何在 Delta 表中存储full_name列以及相关的性能/存储影响。

Delta Lake 如何存储生成列的数据

Delta Lake 将生成列数据持久化存储。读取数据时,不会即时计算该列。数据在 DataFrame 追加到存储时计算并持久化。

让我们回顾一下 Delta 表的高层结构

Delta 表由存储在 Parquet 文件中的数据和存储在事务日志中的元数据组成。

生成列数据持久化在底层 Parquet 文件中,就像其他列的数据一样。

将数据持久化到存储会增加存储成本。Delta 表中多了一个持久化列,您需要为此付费。

持久化数据通常可以节省计算成本,因为在读取数据时不再需要计算full_name值。您可以直接读取该值。

计算通常比存储更昂贵,因此策略性地生成列并节省计算通常会总体上为您节省资金。

现在让我们将注意力转向生成列如何与 Delta 表模式演变交互。

Delta Lake 生成列如何与模式演变配合使用

当启用 Delta Lake 模式演变时,您可以将包含缺失或额外列的 DataFrame 追加到 Delta 表中,详见这篇博文

一旦启用列生成,某些列将变为必需,并且模式演变的行为将与平时不同。如果您在没有任何生成列的 Delta 表上启用模式演变,那么first_namelast_name列将是可选的。在这种情况下,full_name列取决于first_namelast_name列,因此它们是必需的,即使启用了模式演变也是如此。

让我们看看当您尝试将没有 last_name 列的 DataFrame 追加到 Delta 表时会发生什么。

df = spark.createDataFrame(
    [
        (8, "Liam", 66),
        (9, "Colin", 77),
    ]
).toDF("id", "first_name", "age")

df.write.option("mergeSchema", "true").mode("append").format("delta").saveAsTable(
    "some_people"
)

这是您将收到的错误消息

AnalysisException: Column 'last_name' does not exist. Did you mean one of the following? [first_name, age, id]; line 1 pos 24;
'Project [id#2335L AS id#2391L, first_name#2336 AS first_name#2392, age#2337L AS age#2393L, 'concat(first_name#2336,  , 'last_name) AS full_name#2394]
+- Project [_1#2329L AS id#2335L, _2#2330 AS first_name#2336, _3#2331L AS age#2337L]
   +- LogicalRDD [_1#2329L, _2#2330, _3#2331L], false

现在让我们看看如果您尝试追加包含应生成的值的 DataFrame 会发生什么。

当用户提供应生成的列时会发生什么?

如果您尝试追加 Delta 表应为您生成的值,Delta Lake 将拒绝该事务。让我们尝试将包含full_name列的 DataFrame 追加到 Delta 表。

首先创建 DataFrame

df = spark.createDataFrame([
    (21, "Curtis", "Jackson", 47, "50 cent"),
    (22, "Eric", "Wright", None, "easy-e"),
]).toDF("id", "first_name", "last_name", "age", "full_name")

现在尝试将其追加到 Delta 表中

df.write.mode("append").format("delta").saveAsTable("some_people")

Delta Lake 拒绝此事务并提供以下错误消息

Py4JJavaError: An error occurred while calling o234.saveAsTable.
: org.apache.spark.sql.delta.schema.DeltaInvariantViolationException: CHECK constraint Generated Column (full_name <=> concat(first_name, ' ', last_name)) violated by row with values:
 - first_name : Eric
 - full_name : easy-e
 - last_name : Wright

Delta Lake 有理由拒绝此事务。

当生成列依赖于具有 null 值的列时会发生什么?

让我们看看当您尝试将first_namelast_name列为null的 DataFrame 追加到 Delta 表时会发生什么。

df = spark.createDataFrame(
    [
        (44, None, "Perkins", 20),
        (55, "Li", None, 30),
    ]
).toDF("id", "first_name", "last_name", "age")

df.write.mode("append").format("delta").saveAsTable(
    "some_people"
)

查看 DataFrame 的内容

spark.sql("select * from some_people").show()

+---+----------+---------+----+-----------+
| id|first_name|last_name| age|  full_name|
+---+----------+---------+----+-----------+
|  2|       Jim|   Carrey|  61| Jim Carrey|
|  0|       Bob|   Loblaw|  23| Bob Loblaw|
|  1|       Sue|  Grafton|null|Sue Grafton|
| 44|      null|  Perkins|  20|       null|
| 55|        Li|     null|  30|       null|
+---+----------+---------+----+-----------+

在这种情况下,当first_namelast_namenull时,full_namenull。此行为与 PySpark concat 函数的正常行为一致。

Delta Lake 生成列:结论

Delta Lake 允许您创建带有生成列的 Delta 表,这些列会根据其他列值自动计算并持久化存储。

生成列是自动且一致地填充 Delta 表中列的好方法。如果设置了生成列,您无需在追加之前手动向 DataFrame 追加列。

生成列只能添加到新创建或替换的 Delta 表中。您无法向现有 Delta 表添加生成列。

将生成列持久化到存储中是节省计算并加快查询速度的好方法。如果计算已提前运行并持久化到存储中,您在读取数据时无需计算值。

请查阅有关生成列的文档,以了解生成列支持的表达式类型。

LinkedIn 上关注我们的作者