The Linux Foundation Projects
Delta Lake

Delta Lake 变更数据源 (CDF)

作者:Nick Karpov, Matthew Powers

Delta Lake 变更数据流 (CDF) 允许您自动跟踪 Delta 表的行级变更。

Delta Lake 实现的变更数据流快速、可扩展且可靠。

变更数据流对于审计、质量控制、调试和智能下游更新非常有用。这篇博客文章将向您展示如何启用变更数据流,并演示生产数据设置中的常见工作流。

让我们从一个简单的示例开始,展示如何在创建 Delta 表时启用变更数据流以及如何查询 CDF。如果您想跟着这些计算操作,请参阅此笔记本

Delta Lake 变更数据流示例

本节演示如何创建一个启用了 CDF 的 Delta 表,并读取流中包含的行级变更信息。

让我们首先创建一个名为students的表,该表具有first_nameage字段。

spark.sql(
    "CREATE TABLE people (first_name STRING, age LONG) USING delta TBLPROPERTIES (delta.enableChangeDataFeed = true)"
)

让我们向 Delta 表追加一些数据

df = spark.createDataFrame([("Bob", 23), ("Sue", 25), ("Jim", 27)]).toDF(
    "first_name", "age"
)

df.write.mode("append").format("delta").saveAsTable("people")

现在让我们从 Delta 表中删除一行数据

delta_table = DeltaTable.forName(spark, "people")
delta_table.delete(F.col("first_name") == "Sue")

让我们查询变更数据流以查看它包含哪些数据

(
    spark.read.format("delta")
    .option("readChangeFeed", "true")
    .option("startingVersion", 0)
    .table("people")
    .show(truncate=False)
)

+----------+---+------------+---------------+-----------------------+
|first_name|age|_change_type|_commit_version|_commit_timestamp      |
+----------+---+------------+---------------+-----------------------+
|Sue       |25 |delete      |2              |2023-04-20 08:35:53.303|
|Bob       |23 |insert      |1              |2023-04-20 08:33:19.561|
|Sue       |25 |insert      |1              |2023-04-20 08:33:19.561|
|Jim       |27 |insert      |1              |2023-04-20 08:33:19.561|
+----------+---+------------+---------------+-----------------------+

变更数据流维护着对 Delta 表进行的所有行级变更的记录。

现在让我们看看 Delta Lake 是如何在幕后实现 CDF 的。

Delta Lake 变更数据流为何具有可扩展性

Delta 表旨在存储大量数据,因此 Delta Lake CDF 也被设计为可扩展的。

Delta Lake CDF 的实现使其适用于大数据集,因此您永远不必担心变更数据流会变得太大。CDF 条目通常不会给写入操作增加太多开销。

以下是上一节中 Delta 表中包含的文件

spark-warehouse/people
├── _change_data
│   └── cdc-00000-1fedcc32-6734-48c2-ab4e-97c5ba65f2f4.c000.snappy.parquet
├── _delta_log
│   ├── 00000000000000000000.json
│   ├── 00000000000000000001.json
│   └── 00000000000000000002.json
├── part-00000-a90e51ff-c595-47d2-a2b3-c1c161102e8e-c000.snappy.parquet
└── part-00000-edd0d32f-3a48-416a-8f3b-bcce9eb5aa25.c000.snappy.parquet

请注意,变更数据流专门需要的信息存储在_change_data文件夹中。

_delta_log有三个条目,对应于以下事务

  • 00000000000000000000.json(简写00.json):创建启用了变更数据流的 Delta 表的事务
  • 01.json:将数据插入 Delta 表
  • 02.json:从 Delta 表中删除行

CDF 的实现很智能,因此01.json事务不会向 Delta 表添加额外的 Parquet 文件。插入事务可以从主 Delta 表的 Parquet 文件中推断出来,数据不需要重复。没有理由不必要地在_change_data文件夹中添加包含重复数据的文件,从而使 Delta 表的整体大小膨胀。

02.json事务需要将一个 Parquet 文件添加到_change_data文件夹。此 Parquet 文件将包含一行数据

+----------+---+------------+
|first_name|age|_change_type|
+----------+---+------------+
|       Sue| 25|      delete|
+----------+---+------------+

这是02.json事务日志条目的缩写表示

{
  "commitInfo": {
    "timestamp": 1685274696904,
    "operation": "DELETE",

  }
}
{
  "remove": {
    "path": "part-00000-a90e51ff-c595-47d2-a2b3-c1c161102e8e-c000.snappy.parquet",

  }
}
{
  "add": {
    "path": "part-00000-edd0d32f-3a48-416a-8f3b-bcce9eb5aa25.c000.snappy.parquet",

  }
}
{
  "cdc": {
    "path": "_change_data/cdc-00000-1fedcc32-6734-48c2-ab4e-97c5ba65f2f4.c000.snappy.parquet",
    "partitionValues": {},
    "size": 997,
    "dataChange": false
  }
}

变更数据流的更改会像 Delta 表的其他更改一样记录在事务日志中。

本节提供了关于变更数据流如何实现的一个很好的概念性概述。让我们看看如何使用 CDF 来执行更好的增量更新。

Delta Lake 变更数据流用于增量更新

Delta Lake CDF 可以用于最小化地更新下游聚合,而不是重新计算整个表。

假设您有另一个表,用于跟踪客户的累计购买量。此聚合每天增量更新。

创建启用了变更数据流的customer_purchases

spark.sql(
    """
CREATE TABLE IF NOT EXISTS customer_purchases (customer_id LONG, transaction_date DATE, price DOUBLE)
USING delta
TBLPROPERTIES (delta.enableChangeDataFeed = true)
"""
)

现在向customer_purchases表追加一些数据

df = spark.createDataFrame(
    [
        (1, datetime.date(2023, 1, 1), 2.1),
        (2, datetime.date(2023, 1, 5), 3.2),
        (3, datetime.date(2023, 1, 8), 4.4),
        (1, datetime.date(2023, 1, 8), 5.5),
    ]
).toDF("customer_id", "transaction_date", "price")

df.write.mode("append").format("delta").saveAsTable("customer_purchases")

现在创建一个cumulative_purchases表,用于跟踪每个客户的累计购买量

spark.sql(
    """
CREATE TABLE IF NOT EXISTS cumulative_purchases (customer_id LONG, last_transaction DATE, purchases DOUBLE)
USING delta
"""
)

现在填充cumulative_purchases

def agg_customer_purchases(df):
    return df.groupBy("customer_id").agg(
        F.max("transaction_date").alias("last_transaction"),
        F.sum("price").alias("purchases"),
    )

spark.table("customer_purchases").transform(agg_customer_purchases).write.format(
    "delta"
).mode("append").saveAsTable("cumulative_purchases")

检查cumulative_purchases表的内容,确保它已正确填充

spark.sql("select * from cumulative_purchases").show()

+-----------+----------------+---------+
|customer_id|last_transaction|purchases|
+-----------+----------------+---------+
|          1|      2023-01-08|      7.6|
|          3|      2023-01-08|      4.4|
|          2|      2023-01-05|      3.2|
+-----------+----------------+---------+

创建一个包含另一组客户购买数据的 DataFrame,其中一个购买数据是已计数购买的重复项

df = spark.createDataFrame(
    [
        (1, datetime.date(2023, 1, 1), 2.1),  # duplicate transaction from earlier
        (1, datetime.date(2023, 1, 12), 10.1),
        (1, datetime.date(2023, 1, 15), 12.2),
        (3, datetime.date(2023, 1, 22), 14.4),
    ]
).toDF("customer_id", "transaction_date", "price")

利用变更数据流,用新数据最小化更新cumulative_purchases

cdf = (
    spark.read.format("delta")
    .option("readChangeFeed", "true")
    .option("startingVersion", 0)
    .table("customer_purchases")
)

new_transactions = df.join(
    cdf, ["customer_id", "transaction_date", "price"], "leftanti"
)

new_df = new_transactions.transform(agg_customer_purchases)

cumulative_purchases_table = DeltaTable.forName(spark, "cumulative_purchases")

cumulative_purchases_table.alias("target").merge(
    new_df.alias("source"), "target.customer_id = source.customer_id"
).whenMatchedUpdate(
    set={"purchases": "source.purchases + target.purchases"}
).whenNotMatchedInsertAll().execute()

查看cumulative_purchases表的内容,确保它已正确增量更新,而没有计数重复记录。

cumulative_purchases_table.toDF().show()

+-----------+----------------+---------+
|customer_id|last_transaction|purchases|
+-----------+----------------+---------+
|          1|      2023-01-08|     29.9|
|          2|      2023-01-05|      3.2|
|          3|      2023-01-08|     18.8|
+-----------+----------------+---------+

如果没有变更数据流,您需要每次执行去重作业时都完全重建聚合表。启用了 CDF 后,您可以识别customer_purchases表中已删除的行,并为具有去重行的客户最小化更新cumulative_purchases表。

重建整个聚合可能非常昂贵。最小化更新聚合可以节省时间和金钱。在此处了解更多关于Databricks 如何提供像这样的高级物化视图功能

结论

Delta Lake 变更数据流在各种情况下都很有用,并且可以轻松地为新表或现有表启用。

对于某些表,您会希望为审计目的启用 CDF。在这些情况下,您只需启用 CDF,然后忘记它,直到您需要运行临时查询。CDF 与表的其余部分一样受保留期限制,因此请务必阅读文档并适当实施 vacuum 命令,尤其是当您需要为审计目的保留文件时。

在其他情况下,您会特意启用 CDF,以智能地执行最小的下游操作。

CDF 是 Delta Lake 的另一个强大功能,您应该查看官方文档以了解更多信息。

LinkedIn 上关注我们的作者