如何使用 Delta Lake 生成的列
这篇博文介绍了如何使用生成列创建 Delta Lake 表,这是一种根据其他列值自动填充列的绝佳方法。
您将了解在数据工作流程中何时可以使用生成列。
本文中的所有计算都在这个笔记本中,以防您想跟着在本地计算机上运行代码。
创建带有生成列的 Delta Lake 表
在本节中,我们将创建一个 Delta Lake 表,其中包含id
、first_name
、last_name
、age
和full_name
列。
当用户向表中追加数据时,将提供id
、first_name
、last_name
和age
列。
当数据追加到表中时,full_name
列将由 Delta Lake 生成。full_name
列将简单地连接first_name
和last_name
列,并用空格分隔它们。
以下是如何使用 Python 创建空 Delta 表
from delta import DeltaTable
(
DeltaTable.create(spark)
.tableName("default.some_people")
.addColumn("id", "LONG")
.addColumn("first_name", "STRING")
.addColumn("last_name", "STRING")
.addColumn("age", "LONG")
.addColumn(
"full_name", "STRING", generatedAlwaysAs="concat(first_name, ' ', last_name)"
)
.execute()
)
让我们显示 Delta 表并确保它是空的
spark.sql("select * from some_people").show()
+---+----------+---------+---+---------+
| id|first_name|last_name|age|full_name|
+---+----------+---------+---+---------+
+---+----------+---------+---+---------+
很好,现在让我们看看如何向此表追加数据并利用生成列功能。
插入到带有生成列的 Delta Lake 表中
让我们向 Delta Lake 表追加数据。我们将追加一个包含id
、first_name
、last_name
和age
列的 DataFrame。这将让我们观察 Delta Lake 如何自动生成full_name
列。
df = spark.createDataFrame(
[(0, "Bob", "Loblaw", 23),(1, "Sue", "Grafton", None), (2, "Jim", "Carrey", 61)]
).toDF("id", "first_name", "last_name", "age")
df.write.mode("append").format("delta").saveAsTable("some_people")
现在查询表并验证full_name
列是否已由 Delta Lake 填充
DeltaTable.forName(spark, "some_people").toDF().show()
+---+----------+---------+----+-----------+
| id|first_name|last_name| age| full_name|
+---+----------+---------+----+-----------+
| 2| Jim| Carrey| 61| Jim Carrey|
| 0| Bob| Loblaw| 23| Bob Loblaw|
| 1| Sue| Grafton|null|Sue Grafton|
+---+----------+---------+----+-----------+
让我们看看 Delta Lake 如何在 Delta 表中存储full_name
列以及相关的性能/存储影响。
Delta Lake 如何存储生成列的数据
Delta Lake 将生成列数据持久化存储。读取数据时,不会即时计算该列。数据在 DataFrame 追加到存储时计算并持久化。
让我们回顾一下 Delta 表的高层结构
Delta 表由存储在 Parquet 文件中的数据和存储在事务日志中的元数据组成。
生成列数据持久化在底层 Parquet 文件中,就像其他列的数据一样。
将数据持久化到存储会增加存储成本。Delta 表中多了一个持久化列,您需要为此付费。
持久化数据通常可以节省计算成本,因为在读取数据时不再需要计算full_name
值。您可以直接读取该值。
计算通常比存储更昂贵,因此策略性地生成列并节省计算通常会总体上为您节省资金。
现在让我们将注意力转向生成列如何与 Delta 表模式演变交互。
Delta Lake 生成列如何与模式演变配合使用
当启用 Delta Lake 模式演变时,您可以将包含缺失或额外列的 DataFrame 追加到 Delta 表中,详见这篇博文。
一旦启用列生成,某些列将变为必需,并且模式演变的行为将与平时不同。如果您在没有任何生成列的 Delta 表上启用模式演变,那么first_name
和last_name
列将是可选的。在这种情况下,full_name
列取决于first_name
和last_name
列,因此它们是必需的,即使启用了模式演变也是如此。
让我们看看当您尝试将没有 last_name 列的 DataFrame 追加到 Delta 表时会发生什么。
df = spark.createDataFrame(
[
(8, "Liam", 66),
(9, "Colin", 77),
]
).toDF("id", "first_name", "age")
df.write.option("mergeSchema", "true").mode("append").format("delta").saveAsTable(
"some_people"
)
这是您将收到的错误消息
AnalysisException: Column 'last_name' does not exist. Did you mean one of the following? [first_name, age, id]; line 1 pos 24;
'Project [id#2335L AS id#2391L, first_name#2336 AS first_name#2392, age#2337L AS age#2393L, 'concat(first_name#2336, , 'last_name) AS full_name#2394]
+- Project [_1#2329L AS id#2335L, _2#2330 AS first_name#2336, _3#2331L AS age#2337L]
+- LogicalRDD [_1#2329L, _2#2330, _3#2331L], false
现在让我们看看如果您尝试追加包含应生成的值的 DataFrame 会发生什么。
当用户提供应生成的列时会发生什么?
如果您尝试追加 Delta 表应为您生成的值,Delta Lake 将拒绝该事务。让我们尝试将包含full_name
列的 DataFrame 追加到 Delta 表。
首先创建 DataFrame
df = spark.createDataFrame([
(21, "Curtis", "Jackson", 47, "50 cent"),
(22, "Eric", "Wright", None, "easy-e"),
]).toDF("id", "first_name", "last_name", "age", "full_name")
现在尝试将其追加到 Delta 表中
df.write.mode("append").format("delta").saveAsTable("some_people")
Delta Lake 拒绝此事务并提供以下错误消息
Py4JJavaError: An error occurred while calling o234.saveAsTable.
: org.apache.spark.sql.delta.schema.DeltaInvariantViolationException: CHECK constraint Generated Column (full_name <=> concat(first_name, ' ', last_name)) violated by row with values:
- first_name : Eric
- full_name : easy-e
- last_name : Wright
Delta Lake 有理由拒绝此事务。
当生成列依赖于具有 null 值的列时会发生什么?
让我们看看当您尝试将first_name
或last_name
列为null
的 DataFrame 追加到 Delta 表时会发生什么。
df = spark.createDataFrame(
[
(44, None, "Perkins", 20),
(55, "Li", None, 30),
]
).toDF("id", "first_name", "last_name", "age")
df.write.mode("append").format("delta").saveAsTable(
"some_people"
)
查看 DataFrame 的内容
spark.sql("select * from some_people").show()
+---+----------+---------+----+-----------+
| id|first_name|last_name| age| full_name|
+---+----------+---------+----+-----------+
| 2| Jim| Carrey| 61| Jim Carrey|
| 0| Bob| Loblaw| 23| Bob Loblaw|
| 1| Sue| Grafton|null|Sue Grafton|
| 44| null| Perkins| 20| null|
| 55| Li| null| 30| null|
+---+----------+---------+----+-----------+
在这种情况下,当first_name
或last_name
为null
时,full_name
为null
。此行为与 PySpark concat 函数的正常行为一致。
Delta Lake 生成列:结论
Delta Lake 允许您创建带有生成列的 Delta 表,这些列会根据其他列值自动计算并持久化存储。
生成列是自动且一致地填充 Delta 表中列的好方法。如果设置了生成列,您无需在追加之前手动向 DataFrame 追加列。
生成列只能添加到新创建或替换的 Delta 表中。您无法向现有 Delta 表添加生成列。
将生成列持久化到存储中是节省计算并加快查询速度的好方法。如果计算已提前运行并持久化到存储中,您在读取数据时无需计算值。
请查阅有关生成列的文档,以了解生成列支持的表达式类型。