The Linux Foundation Projects
Delta Lake

Delta Lake 约束和检查

作者:Matthew Powers

本文向您展示了如何向 Delta Lake 添加约束,以防止不良数据添加到您的 Delta 表中。

Delta Lake 默认允许您将任何具有匹配数据类型的数据附加到您的 Delta 表中。例如,您可以将任何整数值附加到整数类型的列中。为了限制可以附加到特定列的整数类型,您可以添加约束。

例如,假设您有一个带 age 列的 Delta 表。您可能希望向 Delta 表添加一个约束,以防止负值添加到 age 列,因为它们没有多大意义。

让我们创建一个 Delta 表,添加一个约束,然后看看它如何阻止我们附加不良数据。

如果您想跟着操作,本文中涵盖的所有代码都在此笔记本中。

Delta Lake 检查约束

创建一个 Delta 表,其中包含 ageword 列以及几行数据。

df = spark.createDataFrame(
    [
        (1, "foo"),
        (2, "bar"),
    ],
    ["age", "word"],
)

df.write.format("delta").saveAsTable("random1")

添加一个约束,防止负值添加到 age 列中。

spark.sql("ALTER TABLE default.random1 ADD CONSTRAINT ageIsPositive CHECK (age >= 0)")

运行 SHOW TBLPROPERTIES 以确认已添加约束。

spark.sql("SHOW TBLPROPERTIES random1").show(truncate=False)

+-------------------------------+--------+
|key                            |value   |
+-------------------------------+--------+
|delta.constraints.ageispositive|age >= 0|
|delta.minReaderVersion         |1       |
|delta.minWriterVersion         |3       |
+-------------------------------+--------+

现在创建一个包含一行负年龄和一行正年龄的 DataFrame。尝试将此 DataFrame 附加到 Delta 表中。

df = spark.createDataFrame(
    [
        (-3, "red"),
        (4, "blue"),
    ],
    ["age", "word"],
)

df.write.format("delta").mode("append").saveAsTable("random1")

由于约束,Delta Lake 不允许您将此数据附加到 Delta 表中。这是错误消息:

org.apache.spark.sql.delta.schema.DeltaInvariantViolationException:
CHECK constraint ageispositive (age >= 0) violated by row with values:
 - age : -3

读取 random1 表的内容并验证没有附加额外数据。

spark.table("random1").show()

+---+----+
|age|word|
+---+----+
|  1| foo|
|  2| bar|
+---+----+

Delta 表拒绝了附加操作,因为 DataFrame 包含一行负年龄的数据,这违反了约束。请注意,没有附加任何数据。如果有任何值不满足约束,则不会附加任何值。Delta Lake 支持 ACID 保证,这意味着所有数据都将被附加或所有数据都不会被附加。这些保证在生产数据工作负载中至关重要。

如果您想将此数据添加到 Delta 表中,则必须在附加之前筛选掉不满足约束的值。

df.filter(F.col("age") >= 0).write.format("delta").mode("append").saveAsTable("random1")

spark.table("random1").show()

+---+----+
|age|word|
+---+----+
|  4|blue|
|  1| foo|
|  2| bar|
+---+----+

让我们看看另一种类型的约束,它通常对您的 Delta 表很有用。

Delta Lake 非空约束

您可以向 Delta 表列添加 NOT NULL 约束,以防止附加空数据。此约束通常是可取的,因为您的 Delta 表中通常会有一些列,您不希望它们包含任何 NULL 数据。

创建一个名为 letters 的 Delta 表,其中包含 letter1letter2 列。

spark.sql("""
CREATE TABLE default.letters (
    letter1 STRING,
    letter2 STRING NOT NULL
  ) USING DELTA;
""")

附加一些不包含任何 NULL 值的数据,并验证可以附加良好数据。

df = spark.createDataFrame(
    [
        ("a", "aa"),
        ("b", "bb"),
    ],
    ["letter1", "letter2"],
)

df.write.format("delta").mode("append").saveAsTable("letters")

spark.table("letters").show()

+-------+-------+
|letter1|letter2|
+-------+-------+
|      b|     bb|
|      a|     aa|
+-------+-------+

尝试附加一些在 letter2 列中包含 NULL 的数据,并观察错误消息。

df = spark.createDataFrame(
    [
        ("c", None),
        ("d", "dd"),
    ],
    ["letter1", "letter2"],
)

df.write.format("delta").mode("append").saveAsTable("letters")

这是引发的错误消息:

org.apache.spark.sql.delta.schema.DeltaInvariantViolationException: NOT NULL constraint violated for column: letter2.

Delta Lake 使您可以轻松地防止将 NULL 值添加到列中。

Delta Lake 非空约束与 DataFrame 可空属性

PySpark DataFrame 具有指定每个列的名称、数据类型和可空属性的模式。可空属性决定了每个列是否可以采用空值。

可空 DataFrame 属性不提供 NOT NULL 约束提供的相同保证。例如,当读取包含空值的 CSV 文件时,PySpark 会自动将 nullable 属性设置为 True,即使您尝试将其显式设置为 False

假设您有一个包含以下数据的 CSV 文件:

col1,col2
hi,bye
hola,

将此 CSV 文件读取到 DataFrame 中,并尝试将 col2 可空属性设置为 False

from pyspark.sql.types import StringType, StructType

schema = StructType().add("col1", StringType(), True).add("col2", StringType(), False)

df = (
    spark.read.format("csv")
        .option("header", True)
       .schema(schema)
       .load("../../data/small_file.csv")
)

查看 DataFrame 的内容

df.show()

+----+----+
|col1|col2|
+----+----+
|  hi| bye|
|hola|null|
+----+----+

现在查看 DataFrame 的模式

df.printSchema()

root
 |-- col1: string (nullable = true)
 |-- col2: string (nullable = true)

col2 的可空属性设置为 True,尽管我们在读取数据时尝试将其显式设置为 False

此博客文章不涵盖所有可空属性的边界情况。就本文而言,您只需要了解可空属性不提供 NOT NULL 约束提供的保证。

NOT NULL 约束可防止包含空值的数据附加到您的 Delta 表中。

向现有 Delta 表添加约束

让我们看看当您向已经包含数据的 Delta 表添加检查时会发生什么。

创建一个名为 random2 的 Delta 表,其中包含 ageword 列。

df = spark.createDataFrame(
    [
        (-45, "hi"),
        (2, "bye"),
    ],
    ["age", "word"],
)

df.write.format("delta").saveAsTable("random2")

查看 random2 表的内容。

spark.table("random2").show()

+---+----+
|age|word|
+---+----+
|  2| bye|
|-45|  hi|
+---+----+

其中一行具有负 age 值,另一行具有正 age 值。让我们看看如果您尝试添加一个约束,要求 Delta 表中的所有数据行都具有正 age 值,会发生什么。

spark.sql("ALTER TABLE default.random2 ADD CONSTRAINT ageIsPositive CHECK (age >= 0)")

此命令将报错,并显示以下消息:

AnalysisException: 1 rows in default.random2 violate the new CHECK constraint (age >= 0)

如果现有表不满足布尔检查,Delta Lake 不允许您应用约束。

结论

约束对于确保不良数据不会添加到您的 Delta 表中至关重要。有时您会希望 Delta 表能够摄取所有数据(干净和垃圾),但其他时候您会希望 Delta 表严格,只接受干净的数据。

Delta Lake 约束使您可以在数据附加到 Delta 表之前轻松验证数据。每当您想阻止特定类型的数据添加到列时,请务必使用 Delta Lake 约束。

请记住,Delta Lake 约束适用于行级值。列名称和数据类型的模式级检查通过模式强制单独执行。

LinkedIn 上关注我们的作者