The Linux Foundation Projects
Delta Lake

Delta Lake replaceWhere

作者:Matthew Powers

这篇博客文章解释了如何使用 Delta Lake 的 replaceWhere 功能,根据过滤条件执行选择性覆盖。

replaceWhere 是 Delta Lake overwrite 函数的一个特例,它允许你按如下方式覆盖表的一个子集:

(
    df.write.format("delta")
    .option("replaceWhere", "number > 2")
    .mode("overwrite")
    .save("tmp/my_data")
)

当你没有指定 replaceWhere 时,覆盖保存模式将替换整个表。你可能希望只覆盖表中需要更改的特定部分,而不是更新整个表(这成本很高!)。在这种情况下,你可以使用 replaceWhere 操作只覆盖相关的记录或分区。

选择性覆盖可以节省你的时间和计算开销,尤其是在表被高效分区的情况下。

这篇博文将通过代码示例,带你了解 replaceWhere 最常见的用例。我们将从一个简单的例子开始,然后逐步增加复杂性,看看 replaceWhere 如何应用于分区 Delta 表。最后,你将了解何时以及如何使用 replaceWhere 来加快你的查询和选择性覆盖操作。

我们开始吧 🪂

简单的 Delta Lake replaceWhere 示例

假设我们有一些数据存储在磁盘上的 Delta Lake 中,路径是 tmp/my_data。Delta 表包含两列 letternumber,以及以下数据行:

+------+------+
|letter|number|
+------+------+
|     a|     1|
|     b|     2|
|     c|     3|
|     d|     4|
+------+------+

假设我们想用以下 DataFrame (df2) 更新此表:

+------+------+
|letter|number|
+------+------+
|     x|     7|
|     y|     8|
|     z|     9|
+------+------+

假设我们特别看重 number <= 2 的记录,并且我们绝对需要在最终表中保留这些记录。df1 中的其他记录可以被替换。

在这种情况下,常规的覆盖操作将不起作用。它会简单地覆盖所有内容,我们将丢失珍贵的 number <= 2 记录。

相反,你可以使用 replaceWheredf2 的内容覆盖 Delta 表的行,但**仅当记录符合特定条件时**。在这种情况下,条件将是 number > 2

(
    df2.write.format("delta")
    .option("replaceWhere", "number > 2")
    .mode("overwrite")
    .save("tmp/my_data")
)

重新读取更新后的 Delta Lake,我们可以看到只有 number > 2 的行被覆盖了。

spark.read.format("delta").load("tmp/my_data").show()

+------+------+
|letter|number|
+------+------+
|     a|     1|
|     b|     2|
|     x|     7|
|     y|     8|
|     z|     9|
+------+------+

这是一个简单的例子,旨在说明 replaceWhere 的基本概念。让我们看一个稍微复杂一点的例子,以便更接近实际应用。

Delta Lake replaceWhere 用于分区表

让我们更进一步,看看当你想要对存储在多个分区中的记录执行选择性覆盖时会发生什么。

本节使用 Delta Lake >= 2.0;如果你正在运行旧版 Delta Lake,请参见此处

假设你有一个按 country 分区的 Delta 表,其中包含以下数据:

+----------+---------+---------+-------------+
|first_name|last_name|  country|    continent|
+----------+---------+---------+-------------+
|   Ernesto|  Guevara|Argentina|South America|
|  Wolfgang|   Manche|  Germany|       Europe|
|    Soraya|     Jala|  Germany|       Europe|
|   Jasmine| Terrywin| Thailand|         Asia|
|   Janneke|    Bosma|  Belgium|       Europe|
|     Hamed|   Snouba|  Lebanon|         Asia|
|     Bruce|      Lee|    China|         Asia|
|      Jack|       Ma|    China|         Asia|
+----------+---------+---------+-------------+

现在你的经理要求你进行一些基本的数据匿名化,以保护数据集中主题的隐私。你再次面临紧迫的截止日期,因此你只想对团队相关的记录执行此操作:即 continent = Asia 的记录。

你定义了一个匿名化函数(希望它比下面的函数更难破解😀)来匿名化姓氏:

from pyspark.sql.functions import translate

def anonymizeLastname(df):
    return df.withColumn('last_name', translate('last_name', 'aeiou', '12345'))

此算法使用 translate 函数将字符替换为其指定的对应字符。在此示例中,a 被替换为 1e 被替换为 2i 被替换为 3,依此类推。如前所述,这不是一个安全的匿名化算法,但足以用于我们的演示。

让我们在 DataFrame 上对所有**来自 Asia 的记录**运行此函数,以查看该函数在内存中如何工作:

df.where(col("continent") == "Asia").transform(anonymizeLastname)
df.show()

+----------+---------+--------+---------+
|first_name|last_name| country|continent|
+----------+---------+--------+---------+
|   Jasmine| T2rryw3n|Thailand|     Asia|
|     Hamed|   Sn45b1| Lebanon|     Asia|
|     Bruce|      L22|   China|     Asia|
|      Jack|       M1|   China|     Asia|
+----------+---------+--------+---------+

太棒了!该函数已成功应用于内存中的所有 Asia 记录。

现在让我们选择性地将这些新记录写入磁盘。你可以使用 replaceWhere 只覆盖受影响的记录(continent == 'Asia'),即使它们存储在多个 country 分区中。

(
    df.write.format("delta")
    .option("replaceWhere", "continent = 'Asia'")
    .mode("overwrite")
    .save(deltaPath)
)

现在让我们重新读取数据以确认:

spark.read.format("delta").load(deltaPath).show()

+----------+---------+---------+-------------+
|first_name|last_name|country  |continent    |
+----------+---------+---------+-------------+
|Ernesto   |Guevara  |Argentina|South America|
|Vladimir  |Putin    |Russia   |Europe       |
|Maria     |Sharapova|Russia   |Europe       |
|Jasmine   |T2rryw3n |Thailand |Asia         |
|Janneke   |Bosma    |Belgium  |Europe       |
|Hamed     |Sn45b1   |Lebanon  |Asia         |
|Bruce     |L22      |China    |Asia         |
|Jack      |M1       |China    |Asia         |
+----------+---------+---------+-------------+

太棒了,Asia 记录已成功匿名化!

让我们看看这个 replaceWhere 事务是如何记录在 Delta Lake 事务日志中的,以确认只有亚洲分区发生了变化:

{
    "add": {
        "path": "country=Thailand/part-00000-90e36b14-623b-455b-917a-11a6063ecccb.c000.snappy.parquet",
        "partitionValues": {
            "country": "Thailand"
        },
        "size": 1032,
        "modificationTime": 1702406183349,
        "dataChange": true,
        "stats": "{\"numRecords\":1,\"minValues\":{\"first_name\":\"Jasmine\",\"last_name\":\"T2rryw3n\",\"continent\":\"Asia\"},\"maxValues\":{\"first_name\":\"Jasmine\",\"last_name\":\"T2rryw3n\",\"continent\":\"Asia\"},\"nullCount\":{\"first_name\":0,\"last_name\":0,\"continent\":0}}"
    }
}
{
    "add": {
        "path": "country=Lebanon/part-00001-e419556d-7d8d-4263-b6fd-915a4edff62b.c000.snappy.parquet",
        "partitionValues": {
            "country": "Lebanon"
        },
        "size": 1004,
        "modificationTime": 1702406183349,
        "dataChange": true,
        "stats": "{\"numRecords\":1,\"minValues\":{\"first_name\":\"Hamed\",\"last_name\":\"Sn45b1\",\"continent\":\"Asia\"},\"maxValues\":{\"first_name\":\"Hamed\",\"last_name\":\"Sn45b1\",\"continent\":\"Asia\"},\"nullCount\":{\"first_name\":0,\"last_name\":0,\"continent\":0}}"
    }
}
{
    "add": {
        "path": "country=China/part-00002-0a628002-85f9-450c-9340-4b01ef225e0e.c000.snappy.parquet",
        "partitionValues": {
            "country": "China"
        },
        "size": 1002,
        "modificationTime": 1702406183354,
        "dataChange": true,
        "stats": "{\"numRecords\":2,\"minValues\":{\"first_name\":\"Bruce\",\"last_name\":\"L22\",\"continent\":\"Asia\"},\"maxValues\":{\"first_name\":\"Jack\",\"last_name\":\"M1\",\"continent\":\"Asia\"},\"nullCount\":{\"first_name\":0,\"last_name\":0,\"continent\":0}}"
    }
}
{
    "remove": {
        "path": "country=Thailand/part-00005-1f073a57-dca5-4690-9f9c-ffebb5912b75.c000.snappy.parquet",
        "deletionTimestamp": 1702406182360,
        "dataChange": true,
        "extendedFileMetadata": true,
        "partitionValues": {
            "country": "Thailand"
        },
        "size": 1032
    }
}
{
    "remove": {
        "path": "country=Lebanon/part-00002-7594dbc1-85d4-4f08-980f-4a2e56420b3a.c000.snappy.parquet",
        "deletionTimestamp": 1702406182360,
        "dataChange": true,
        "extendedFileMetadata": true,
        "partitionValues": {
            "country": "Lebanon"
        },
        "size": 1004
    }
}
{
    "remove": {
        "path": "country=China/part-00000-e364c08a-1db9-4736-9fa6-c51b7b72caa2.c000.snappy.parquet",
        "deletionTimestamp": 1702406182360,
        "dataChange": true,
        "extendedFileMetadata": true,
        "partitionValues": {
            "country": "China"
        },
        "size": 1002
    }
}

很好,日志证实只有位于亚洲的国家——泰国、黎巴嫩和中国——的分区被此 replaceWhere 操作编辑过。

你可以想象,如果这是一个大型数据集,你将节省大量时间和金钱,因为无需重写所有未受更改影响的其他分区。

replaceWhere 与动态分区

动态分区是另一种覆盖形式,它将覆盖写入新数据的每个分区中的**所有现有数据**。它仅适用于使用 Delta Lake 2.0 或更高版本的用户。

你可以通过将 partitionOverwriteMode 设置为 dynamic 作为对 DataFrameWriter 的调用的一部分来使用动态分区:

(
    df.write.format("delta")
    .mode("overwrite")
    .option("partitionOverwriteMode", "dynamic")
    .saveAsTable("default.people10m")
)

使用动态分区时,你需要验证使用动态分区写入的数据是否只触及你预期的分区。如果意外地在不正确的分区中有一行,这可能会导致覆盖原本应该保持不变的整个分区。

通常建议使用 replaceWhere 来指定要覆盖哪些数据。replaceWhere 是一种更精确的功能,因为它强制你指定过滤谓词。

结论

这篇博客文章向你展示了为什么以及如何使用 replaceWhere 选择性地覆盖 Delta 表的部分内容。你已经通过多个示例了解了如何应用 replaceWhere,通过只写入符合特定谓词的记录,使数据重写更高效。你已经在未分区和分区 Delta 表中应用了 replaceWhere,并了解了 replaceWhere 与动态分区的不同用例。

LinkedIn 上关注我们的作者