Delta Lake 变更数据源 (CDF)
Delta Lake 变更数据流 (CDF) 允许您自动跟踪 Delta 表的行级变更。
Delta Lake 实现的变更数据流快速、可扩展且可靠。
变更数据流对于审计、质量控制、调试和智能下游更新非常有用。这篇博客文章将向您展示如何启用变更数据流,并演示生产数据设置中的常见工作流。
让我们从一个简单的示例开始,展示如何在创建 Delta 表时启用变更数据流以及如何查询 CDF。如果您想跟着这些计算操作,请参阅此笔记本。
Delta Lake 变更数据流示例
本节演示如何创建一个启用了 CDF 的 Delta 表,并读取流中包含的行级变更信息。
让我们首先创建一个名为students
的表,该表具有first_name
和age
字段。
spark.sql(
"CREATE TABLE people (first_name STRING, age LONG) USING delta TBLPROPERTIES (delta.enableChangeDataFeed = true)"
)
让我们向 Delta 表追加一些数据
df = spark.createDataFrame([("Bob", 23), ("Sue", 25), ("Jim", 27)]).toDF(
"first_name", "age"
)
df.write.mode("append").format("delta").saveAsTable("people")
现在让我们从 Delta 表中删除一行数据
delta_table = DeltaTable.forName(spark, "people")
delta_table.delete(F.col("first_name") == "Sue")
让我们查询变更数据流以查看它包含哪些数据
(
spark.read.format("delta")
.option("readChangeFeed", "true")
.option("startingVersion", 0)
.table("people")
.show(truncate=False)
)
+----------+---+------------+---------------+-----------------------+
|first_name|age|_change_type|_commit_version|_commit_timestamp |
+----------+---+------------+---------------+-----------------------+
|Sue |25 |delete |2 |2023-04-20 08:35:53.303|
|Bob |23 |insert |1 |2023-04-20 08:33:19.561|
|Sue |25 |insert |1 |2023-04-20 08:33:19.561|
|Jim |27 |insert |1 |2023-04-20 08:33:19.561|
+----------+---+------------+---------------+-----------------------+
变更数据流维护着对 Delta 表进行的所有行级变更的记录。
现在让我们看看 Delta Lake 是如何在幕后实现 CDF 的。
Delta Lake 变更数据流为何具有可扩展性
Delta 表旨在存储大量数据,因此 Delta Lake CDF 也被设计为可扩展的。
Delta Lake CDF 的实现使其适用于大数据集,因此您永远不必担心变更数据流会变得太大。CDF 条目通常不会给写入操作增加太多开销。
以下是上一节中 Delta 表中包含的文件
spark-warehouse/people
├── _change_data
│ └── cdc-00000-1fedcc32-6734-48c2-ab4e-97c5ba65f2f4.c000.snappy.parquet
├── _delta_log
│ ├── 00000000000000000000.json
│ ├── 00000000000000000001.json
│ └── 00000000000000000002.json
├── part-00000-a90e51ff-c595-47d2-a2b3-c1c161102e8e-c000.snappy.parquet
└── part-00000-edd0d32f-3a48-416a-8f3b-bcce9eb5aa25.c000.snappy.parquet
请注意,变更数据流专门需要的信息存储在_change_data
文件夹中。
_delta_log
有三个条目,对应于以下事务
00000000000000000000.json
(简写00.json
):创建启用了变更数据流的 Delta 表的事务01.json
:将数据插入 Delta 表02.json
:从 Delta 表中删除行
CDF 的实现很智能,因此01.json
事务不会向 Delta 表添加额外的 Parquet 文件。插入事务可以从主 Delta 表的 Parquet 文件中推断出来,数据不需要重复。没有理由不必要地在_change_data
文件夹中添加包含重复数据的文件,从而使 Delta 表的整体大小膨胀。
02.json
事务需要将一个 Parquet 文件添加到_change_data
文件夹。此 Parquet 文件将包含一行数据
+----------+---+------------+
|first_name|age|_change_type|
+----------+---+------------+
| Sue| 25| delete|
+----------+---+------------+
这是02.json
事务日志条目的缩写表示
{
"commitInfo": {
"timestamp": 1685274696904,
"operation": "DELETE",
…
}
}
{
"remove": {
"path": "part-00000-a90e51ff-c595-47d2-a2b3-c1c161102e8e-c000.snappy.parquet",
…
}
}
{
"add": {
"path": "part-00000-edd0d32f-3a48-416a-8f3b-bcce9eb5aa25.c000.snappy.parquet",
…
}
}
{
"cdc": {
"path": "_change_data/cdc-00000-1fedcc32-6734-48c2-ab4e-97c5ba65f2f4.c000.snappy.parquet",
"partitionValues": {},
"size": 997,
"dataChange": false
}
}
变更数据流的更改会像 Delta 表的其他更改一样记录在事务日志中。
本节提供了关于变更数据流如何实现的一个很好的概念性概述。让我们看看如何使用 CDF 来执行更好的增量更新。
Delta Lake 变更数据流用于增量更新
Delta Lake CDF 可以用于最小化地更新下游聚合,而不是重新计算整个表。
假设您有另一个表,用于跟踪客户的累计购买量。此聚合每天增量更新。
创建启用了变更数据流的customer_purchases
表
spark.sql(
"""
CREATE TABLE IF NOT EXISTS customer_purchases (customer_id LONG, transaction_date DATE, price DOUBLE)
USING delta
TBLPROPERTIES (delta.enableChangeDataFeed = true)
"""
)
现在向customer_purchases
表追加一些数据
df = spark.createDataFrame(
[
(1, datetime.date(2023, 1, 1), 2.1),
(2, datetime.date(2023, 1, 5), 3.2),
(3, datetime.date(2023, 1, 8), 4.4),
(1, datetime.date(2023, 1, 8), 5.5),
]
).toDF("customer_id", "transaction_date", "price")
df.write.mode("append").format("delta").saveAsTable("customer_purchases")
现在创建一个cumulative_purchases
表,用于跟踪每个客户的累计购买量
spark.sql(
"""
CREATE TABLE IF NOT EXISTS cumulative_purchases (customer_id LONG, last_transaction DATE, purchases DOUBLE)
USING delta
"""
)
现在填充cumulative_purchases
表
def agg_customer_purchases(df):
return df.groupBy("customer_id").agg(
F.max("transaction_date").alias("last_transaction"),
F.sum("price").alias("purchases"),
)
spark.table("customer_purchases").transform(agg_customer_purchases).write.format(
"delta"
).mode("append").saveAsTable("cumulative_purchases")
检查cumulative_purchases
表的内容,确保它已正确填充
spark.sql("select * from cumulative_purchases").show()
+-----------+----------------+---------+
|customer_id|last_transaction|purchases|
+-----------+----------------+---------+
| 1| 2023-01-08| 7.6|
| 3| 2023-01-08| 4.4|
| 2| 2023-01-05| 3.2|
+-----------+----------------+---------+
创建一个包含另一组客户购买数据的 DataFrame,其中一个购买数据是已计数购买的重复项
df = spark.createDataFrame(
[
(1, datetime.date(2023, 1, 1), 2.1), # duplicate transaction from earlier
(1, datetime.date(2023, 1, 12), 10.1),
(1, datetime.date(2023, 1, 15), 12.2),
(3, datetime.date(2023, 1, 22), 14.4),
]
).toDF("customer_id", "transaction_date", "price")
利用变更数据流,用新数据最小化更新cumulative_purchases
cdf = (
spark.read.format("delta")
.option("readChangeFeed", "true")
.option("startingVersion", 0)
.table("customer_purchases")
)
new_transactions = df.join(
cdf, ["customer_id", "transaction_date", "price"], "leftanti"
)
new_df = new_transactions.transform(agg_customer_purchases)
cumulative_purchases_table = DeltaTable.forName(spark, "cumulative_purchases")
cumulative_purchases_table.alias("target").merge(
new_df.alias("source"), "target.customer_id = source.customer_id"
).whenMatchedUpdate(
set={"purchases": "source.purchases + target.purchases"}
).whenNotMatchedInsertAll().execute()
查看cumulative_purchases
表的内容,确保它已正确增量更新,而没有计数重复记录。
cumulative_purchases_table.toDF().show()
+-----------+----------------+---------+
|customer_id|last_transaction|purchases|
+-----------+----------------+---------+
| 1| 2023-01-08| 29.9|
| 2| 2023-01-05| 3.2|
| 3| 2023-01-08| 18.8|
+-----------+----------------+---------+
如果没有变更数据流,您需要每次执行去重作业时都完全重建聚合表。启用了 CDF 后,您可以识别customer_purchases
表中已删除的行,并为具有去重行的客户最小化更新cumulative_purchases
表。
重建整个聚合可能非常昂贵。最小化更新聚合可以节省时间和金钱。在此处了解更多关于Databricks 如何提供像这样的高级物化视图功能。
结论
Delta Lake 变更数据流在各种情况下都很有用,并且可以轻松地为新表或现有表启用。
对于某些表,您会希望为审计目的启用 CDF。在这些情况下,您只需启用 CDF,然后忘记它,直到您需要运行临时查询。CDF 与表的其余部分一样受保留期限制,因此请务必阅读文档并适当实施 vacuum 命令,尤其是当您需要为审计目的保留文件时。
在其他情况下,您会特意启用 CDF,以智能地执行最小的下游操作。
CDF 是 Delta Lake 的另一个强大功能,您应该查看官方文档以了解更多信息。