Delta Lake 合并
作者:Nick Karpov
MERGE
是 Delta Lake 最强大的操作。通过合并,您可以在单个事务中应用所有三种标准数据操纵语言操作(INSERT
、UPDATE
和 DELETE
)。您还可以为每个操作添加多个条件,以应对更复杂的场景和数据集。
在本文中,我们将逐个示例地探索合并命令的全部范围,到本文结束时,您将能够构建广泛的合并查询来满足您的用例。
MERGE
具有众所周知的 SQL 语法,因此本文中我们将使用 PySpark API (merge
) 来强调 Delta Lake Spark 连接器也支持 Python 和 Scala。
如果您想跟着操作,所有代码片段都在此 Jupyter Notebook中。
何时使用 Delta Lake 合并
合并是许多 ETL 用例的主力。以下是一些激励人心的例子:
通常,Delta Lake 合并非常适合当您想要对 Delta 表进行选择性更改而无需重写整个表时。
带 whenNotMatchedInsert 的 Delta Lake 合并
接收包含现有数据和新数据的数据集是很常见的。这意味着我们不能简单地附加新数据集而无需执行额外的逻辑。对于传统的 Hive 风格 Parquet 表,我们可能还需要重写整个表。
通过 Delta Lake merge
,我们可以轻松表达如何处理这种情况并避免重写整个表。
让我们从一个简单的例子开始。
首先创建一个初始 Delta Lake 表,其中包含 3 行 name
、age
和唯一的 id
。
data = [(0, "Bob", 23), (1, "Sue", 25), (2, "Jim", 27)]
df = spark.createDataFrame(data).toDF("id", "name", "age")
df.repartition(1).write.format("delta").save("/tmp/people")
注意:repartition(1)
用于输出单个文件,使演示更清晰。
现在让我们模拟我们将收到的新数据集。注意,我们既有新行,也有在上面引导的初始人员 Delta Lake 表中已经存在的行。
new_data = [
(0, "Bob", 23), # exists in our original dataset above
(3, "Sally", 30), # new data
(4, "Henry", 33), # new data
]
new_df = spark.createDataFrame(new_data).toDF("id", "name", "age").repartition(1)
让我们初始化对现有 Delta Lake 表的引用。
from delta.tables import DeltaTable
people_table = DeltaTable.forPath(spark, "/tmp/people")
现在我们可以使用带有 whenNotMatchedInsert 子句的合并,只添加不匹配现有行的新行。
people_table.alias("target").merge(
new_df.alias("source"), "target.id = source.id"
).whenNotMatchedInsert(
values={"id": "source.id", "name": "source.name", "age": "source.age"}
).execute()
设置合并命令需要几个步骤,所以让我们分解一下。
首先,我们为 Delta Lake 表设置别名,以便我们可以在后续表达式中引用它。通常将现有表称为“目标”。我们将继续使用此术语。
people_table.alias("target")
其次,我们调用 merge,它接受两个参数:一个新数据集和连接条件。
.merge(
new_df.alias("source"), "target.id = source.id"
)
与上面的 target
一样,将新数据集称为源是很常见的做法,因此我们也为 new_df 设置别名为 source
。
我们两个数据集的连接条件使用 id
字段,其中 target.id = source.id
,但也可以将其扩展为更复杂的场景中的任意表达式。我们将在后面的示例中探讨这一点。
第三,我们调用 whenNotMatchedInsert
子句,其中包含一个字典,该字典具有表示我们要插入的字段和值的键值对。我们上面定义的连接条件决定了 whenNotMatchedInsert
将应用于哪些行。在这种情况下,whenNotMatchedInsert
子句将应用于不匹配连接条件的行,即目标表中不存在的行。请注意,我们继续使用 source
别名来引用新数据集。
.whenNotMatchedInsert(
values={"id": "source.id", "name": "source.name", "age": "source.age"}
).execute()
在上面的示例中,我们传递了一个字典,以演示您可以自定义插入新值的方式。键是我们想要插入的字段,每个键的值是我们想要为每个相应字段插入的表达式。当 source
和 target
数据集没有相同的模式,或者当您想要做的事情不仅仅是像这个示例中那样简单地复制数据时,这个显式字典特别有用。我们稍后会扩展这一点。
在这个简单的情况下,我们也可以使用一个有用的简写方法,而不是传递字典。
.whenNotMatchedInsertAll().execute()
whenNotMatchedInsertAll
默认为插入所有源字段。
让我们确认结果符合我们的预期:我们的 people
表最初有 3 行,现在应该有来自新数据集的两行额外行。
people_table.toDF().show()
+---+-----+---+
| id| name|age|
+---+-----+---+
| 0| Bob| 23|
| 1| Sue| 25|
| 2| Jim| 27|
| 3|Sally| 30|
| 4|Henry| 33|
+---+-----+---+
我们一开始就说过,Delta Lake 合并非常适合在不覆盖整个表的情况下应用更改。让我们列出数据文件并仔细查看 _delta_log
中的提交文件,以确认我们只添加了文件,没有覆盖任何旧数据。
> ls /tmp/people
_delta_log
part-00000-15d1e82e-2f94-4a91-a143-f970a973fccc-c000.snappy.parquet
part-00000-98425a3b-ca1c-4efd-be4a-3c843b765299-c000.snappy.parquet
> cat /tmp/people/_delta_log/00000000000000000000.json
…
{"add":{"path":"part-00000-15d1e82e-2f94-4a91-a143-f970a973fccc-c000.snappy.parquet", ... }
> cat /tmp/people/_delta_log/00000000000000000001.json
…
{"add":{"path":"part-00000-98425a3b-ca1c-4efd-be4a-3c843b765299-c000.snappy.parquet", ... }
我们可以在上面看到,Delta 表由两个 parquet 文件组成,它们分别在两次单独的提交中添加。第一次提交是原始写入,用于设置数据,第二次提交是合并操作。Delta Lake merge
足够智能,不会覆盖任何旧数据,只插入新行。
现在,让我们用更复杂的数据批次来使事情更有趣,以 merge
到我们的人员数据集中。
带 whenMatchedUpdate 的 Delta Lake 合并
在前面的示例中,我们知道 source
数据的第一个与目标数据完全相同,因此我们只是忽略了它。这次我们将创建一个更真实的数据集,它具有相同的 id
,但 age
值不同,并在我们的 merge
查询中添加一个新子句来处理这个新数据。
new_data = [
(4, "Henry", 34),
(5, "Allie", 22),
]
new_df = spark.createDataFrame(new_data).toDF("id", "name", "age").repartition(1)
上面是我们的新源数据。代表 Henry 的第一行已经在我们的目标数据中,但这批新数据更新了他的 age
,因为他的生日已经过去了。我们还有 Allie 的一行新数据,类似于我们的第一个示例。
这是处理此数据的合并查询。
people_table.alias("target").merge(
new_df.alias("source"), "target.id = source.id"
).whenMatchedUpdate(set={"age": "source.age"}).whenNotMatchedInsertAll().execute()
我们对第一个示例中的合并查询进行了两处更改。
第一个更改是我们将 whenMatchedInsert
切换为 whenMatchedInsertAll
。正如我们之前所指出的,当源和目标模式相同时,此简写很有用,因此我们可以安全地插入目标数据中尚不存在的所有新值。
第二个更改是我们引入了 whenMatchedUpdate
子句来处理 Henry 的年龄更新。
.whenMatchedUpdate(set={"age": "source.age"})
whenMatchedUpdate
子句需要一个字典,就像 whenNotMatchedInsert
一样,告诉它在找到匹配项时(即满足连接条件时)更新 target
数据中的哪些值。在这种情况下,我们明确告诉 whenMatchedUpdate
只更新 age
,尽管我们也可以添加一个或多个剩余字段。在这种情况下,我们恰好知道没有人的 id
或 name
发生了变化。
请注意,与第一个示例中的 whenNotMatchedInsert
一样,whenMatchedUpdate
也有一个有用的简写,当我们知道可以使用源数据安全地更新所有目标字段而无需显式地在字典中传递它们时:whenMatchedUpdateAll
。
让我们确认我们的人员数据集已更新 Henry 的年龄并将 Allie 插入到人员数据集中。
people_table.toDF().show()
+---+-----+---+
| id| name|age|
+---+-----+---+
| 3|Sally| 30|
| 4|Henry| 34|
| 5|Allie| 22|
| 0| Bob| 23|
| 1| Sue| 25|
| 2| Jim| 27|
+---+-----+---+
通过这两个示例,我们已经可以处理将各种源数据合并到我们的目标 Delta Lake 表中。
在我们继续更高级的示例之前,让我们用传统方法回顾我们的第一个示例。这将使我们更深入地了解合并的工作原理,并揭示 Delta Lake 如何为您的数据集应用更改提供更安全、更灵活、更快速的方法。
Delta Lake 合并与 Parquet 表合并
让我们设置我们开始的相同示例,但这次使用传统的 Hive 风格 Parquet 表。
data = [(0, "Bob", 23), (1, "Sue", 25), (2, "Jim", 27)]
df = spark.createDataFrame(data).toDF("id", "name", "age")
df.repartition(1).write.format("parquet").save("/tmp/parquet/people")
target = spark.read.format("parquet").load("/tmp/parquet/people")
new_data = [
(0, "Bob", 23),
(3, "Sally", 30),
(4, "Henry", 33),
]
source = spark.createDataFrame(new_data).toDF("id", "name", "age").repartition(1)
请注意,我们为了清晰起见,明确地将上面刚刚写入的数据集重新读取为 DataFrame
target
。
要合并这两个数据集,我们需要连接它们并手动编写表达式,以有条件地在源数据和目标数据之间进行选择。
source_prefix = source.select([F.col(c).alias("source_" + c) for c in source.columns])
target_prefix = target.select([F.col(c).alias("target_" + c) for c in target.columns])
joined_df = source_prefix.join(
target_prefix, target_prefix.target_id == source_prefix.source_id, "full_outer"
)
final_df = joined_df.select(
F.expr("CASE WHEN target_id IS NULL THEN source_id ELSE target_id END").alias("id"),
F.expr("CASE WHEN target_name IS NULL THEN source_name ELSE target_name END").alias(
"name"
),
F.expr("CASE WHEN target_age IS NULL THEN source_age ELSE target_age END").alias(
"age"
),
)
这里有很多需要解释的地方,所以让我们解释一下我们做了什么。
- 分别在
source
和target
数据中的所有列添加前缀。我们需要这样做,以便以后在合并逻辑中可以区分和引用我们的source
和target
列。 - 在匹配条件(
id
相同)上连接我们的source
和target
。我们使用full_outer
连接,因为我们的最终表中需要来自源和目标的所有行。 - 我们处理
joined_df
的每一行,并为每一列有条件地选择source
或target
值,具体取决于数据是否存在于target
中。这大致模拟了合并的whenNotMatchedInsert
子句为我们所做的事情。 - 我们使用
final_df
写入一个全新的人员数据集。
这真是大量的手动和繁琐的工作!只需将其与第一个示例中合并命令的几行代码进行比较。这应该清楚地表明,手动编写基于 Parquet 表的 merge
逻辑非常困难。很容易出错,也很难发现错误。
在这个例子中,我们甚至没有考虑所有可能性和边缘情况。我们必须进一步扩展我们的条件表达式,才能使这种方法适用于我们的第二个例子,该例子除了插入新行外,还增加了对特定字段的更新。为了简单起见,我们利用了我们已经知道所有数据的事实。
最后,我们不得不重写整个现有表。请注意,我们将 final_df
写入了一个新位置,因为我们无法在读取现有表的同时覆盖它。因此,这种方法不仅容易出错且难以理解,而且速度极慢。想象一下对一个 TB 大小的表这样做!
好消息是,我们刚刚做的很多事情都由 Delta Lake merge
在后台为我们完成了,而且方式非常相似。我们必须连接两个数据集并逐行处理连接结果的一般思想保持不变。在未来的文章中,我们将更仔细地研究实际的 Delta Lake 合并实现,以探索它改进这种通用方法以实现数量级更好性能的一些方式。
目前,让我们回到 Delta Lake 合并命令的安全性和速度,并展示一些更高级的示例。
使用合并应用变更数据
在我们的前两个示例中,我们收到的新数据与我们现有数据非常相似。新数据集具有相同的模式,并且没有缺失值。大多数真实世界的场景并非如此方便。
我们更有可能收到一个具有不同模式,并且具有意外甚至缺失值的数据集。让我们看看变更数据捕获模式中出现的两个常见场景来演示这一点。
Delta Lake 合并用于完整变更数据
让我们回顾一下我们 people
数据集的状态,并创建一个新的变更集,我们将应用它。
people_table.toDF().show()
+---+-----+---+
| id| name|age|
+---+-----+---+
| 0| Bob| 23|
| 1| Sue| 25|
| 2| Jim| 27|
| 3|Sally| 30|
| 4|Henry| 33|
+---+-----+---+
new_data = [
(9, "Richard", 75, "INSERT"),
(3, "Sally", 31, "UPDATE"),
(0, "Bob", 23, "DELETE"),
]
new_df = spark.createDataFrame(new_data).toDF("id", "name", "age", "_op").repartition(1)
当处理来自上游源的变更数据时,我们通常会得到生成它的操作类型。在上面的 new_df
中,我们现在有一个新的 _op
列,它告诉我们操作是 INSERT
、UPDATE
还是 DELETE
。
我们的 new_df
每种操作类型都有一个示例:Richard 的新行的 INSERT
、Sally 的 UPDATE
和 Bob 的 DELETE
。
这是我们处理此数据的新合并查询的样子:
people_table.alias("target").merge(
new_df.alias("source"), "target.id = source.id"
).whenNotMatchedInsert(
condition='source._op = "INSERT"',
values={"id": "source.id", "name": "source.name", "age": "source.age"},
).whenMatchedUpdate(
condition='source._op = "UPDATE"',
set={"id": "source.id", "name": "source.name", "age": "source.age"},
).whenMatchedDelete(
condition='source._op = "DELETE"'
).execute()
merge
语句现在比我们之前的两个示例稍微长一些,但仍然遵循相同的原则。只有两个区别需要强调:
第一个区别是我们添加了一个 whenMatchedDelete
子句。顾名思义,此子句将删除匹配我们的合并条件 target.id = source.id
以及我们传递给 whenMatchedDelete
子句的附加条件的任何行。这就是我们使用 _op
列检查操作是否为 DELETE
的地方。我们必须这样做,因为我们必须区分执行 UPDATE
的匹配和执行 DELETE
的匹配。请注意,我们还在其他子句中添加了条件参数,以考虑所有三种可能性。
第二个区别是我们不再使用 whenMatchedInsertAll
或 whenMatchedUpdateAll
简写。这是因为我们的新数据与我们的 target
人员表没有相同的模式,因此 Delta Lake 的模式强制功能会抛出错误。这只有在我们启用了模式演化的情况下才能工作,您可以在此处阅读相关内容。通常,您希望尽可能明确,以避免意外行为。
在这个示例中,我们仍然很幸运地在变更数据中获得了每个字段的值。如果我们只有部分数据,我们如何使用 merge
?让我们看一个示例。
Delta Lake 合并用于部分变更数据
大型系统通常通过仅提供部分更新来优化变更数据。对于宽数据集,甚至只是任意字符串或 JSON Blob 等大型数据类型,传输和重新处理整个行都是浪费的。更有效的方法是只发送更改的数据,而将其余数据排除。
让我们回顾一下我们的人员表并创建最终变更集以展示它的样子。
people_table.toDF().show()
+---+-------+---+
| id| name|age|
+---+-------+---+
| 1| Sue| 25|
| 2| Jim| 27|
| 3| Sally| 31|
| 4| Henry| 34|
| 5| Allie| 22|
| 9|Richard| 75|
+---+-------+---+
new_data = [
(1, "SueNew", None, "UPDATE"),
(3, None, 32, "UPDATE"),
]
new_df = spark.createDataFrame(new_data).toDF("id", "name", "age", "_op").repartition(1)
这次我们的新数据在 name
和 age
列中都有缺失值:Sue 将她的名字改为 SueNew,Sally 的年龄又增加了一岁。
既然我们知道只有更新,我们将只编写更新子句以求简洁。这是我们新的合并查询的样子。
people_table.alias("target").merge(
new_df.alias("source"), "target.id = source.id"
).whenMatchedUpdate(
condition='source._op = "UPDATE"',
set={
"id": "source.id",
"name": "CASE WHEN source.name IS NOT NULL THEN source.name ELSE target.name END",
"age": "CASE WHEN source.age IS NOT NULL THEN source.age ELSE target.age END",
},
).execute()
正如我们在第一个示例中指出的,这就是我们传递给子句的显式字典显示其灵活性的地方。我们有权将任意表达式作为值传递,而不仅仅是字段名。在这种情况下,我们明确检查 source
值是否可用,如果不可用,我们只需告诉 merge
保留 target
值。
这些表达式可能看起来与我们的旧版示例相似,但存在一个重要的区别。在我们的旧版 Hive 风格 Parquet 表示例中,我们使用这些表达式来模拟我们的子句,而在本例中,我们使用它们来用更多条件扩充我们的子句。如果我们要编写此示例的旧版等效代码,我们将需要为每个字段编写所有可能的排列!
让我们最后一次确认此合并正确更新了 Sue 的姓名和 Sally 的年龄。
people_table.toDF().show()
+---+-------+---+
| id| name|age|
+---+-------+---+
| 9|Richard| 75|
| 5| Allie| 22|
| 1| SueNew| 25|
| 3| Sally| 32|
| 2| Jim| 27|
| 4| Henry| 34|
+---+-------+---+
结论
在本文中,我们构建并探索了 Delta Lake merge
命令的全部范围。
我们演示了 Delta Lake merge
如何成为最强大、最灵活的命令,适用于您需要高效地对 Delta Lake 表应用选择性更改的场景。
我们学习了如何使用可用的 merge
子句执行所有三种数据操纵操作:INSERT
、UPDATE
和 DELETE
,以及如何使用附加条件扩展这些操作以支持更复杂的逻辑。
我们还学习了如何处理缺失值和不同模式的合并,这在变更数据捕获用例中很常见。
Delta Lake 合并命令支持的内容远超本文所能涵盖。例如,当每个子句都有不同的附加条件时,我们还可以支持多个相同的子句。当您希望根据特定行的内容执行不同类型的 INSERT
或 UPDATE
操作时,这非常有用。我们将在未来的文章中继续涵盖更高级的用例。
以下是一些额外的资源,可帮助巩固您对 Delta Lake 合并命令的理解。此列表包括官方文档、视频,甚至一些可用于简化因维护缓慢变化维度等用例而产生的复杂合并语句的帮助库。
Delta Lake PySpark 文档页面 合并 — Delta Lake 文档 技术讲座 | 深入 Delta Lake 第三部分:DELETE、UPDATE 和 MERGE 如何工作 mack - Delta Lake Python 帮助方法