Delta Lake 约束和检查
本文向您展示了如何向 Delta Lake 添加约束,以防止不良数据添加到您的 Delta 表中。
Delta Lake 默认允许您将任何具有匹配数据类型的数据附加到您的 Delta 表中。例如,您可以将任何整数值附加到整数类型的列中。为了限制可以附加到特定列的整数类型,您可以添加约束。
例如,假设您有一个带 age
列的 Delta 表。您可能希望向 Delta 表添加一个约束,以防止负值添加到 age
列,因为它们没有多大意义。
让我们创建一个 Delta 表,添加一个约束,然后看看它如何阻止我们附加不良数据。
如果您想跟着操作,本文中涵盖的所有代码都在此笔记本中。
Delta Lake 检查约束
创建一个 Delta 表,其中包含 age
和 word
列以及几行数据。
df = spark.createDataFrame(
[
(1, "foo"),
(2, "bar"),
],
["age", "word"],
)
df.write.format("delta").saveAsTable("random1")
添加一个约束,防止负值添加到 age
列中。
spark.sql("ALTER TABLE default.random1 ADD CONSTRAINT ageIsPositive CHECK (age >= 0)")
运行 SHOW TBLPROPERTIES
以确认已添加约束。
spark.sql("SHOW TBLPROPERTIES random1").show(truncate=False)
+-------------------------------+--------+
|key |value |
+-------------------------------+--------+
|delta.constraints.ageispositive|age >= 0|
|delta.minReaderVersion |1 |
|delta.minWriterVersion |3 |
+-------------------------------+--------+
现在创建一个包含一行负年龄和一行正年龄的 DataFrame。尝试将此 DataFrame 附加到 Delta 表中。
df = spark.createDataFrame(
[
(-3, "red"),
(4, "blue"),
],
["age", "word"],
)
df.write.format("delta").mode("append").saveAsTable("random1")
由于约束,Delta Lake 不允许您将此数据附加到 Delta 表中。这是错误消息:
org.apache.spark.sql.delta.schema.DeltaInvariantViolationException:
CHECK constraint ageispositive (age >= 0) violated by row with values:
- age : -3
读取 random1
表的内容并验证没有附加额外数据。
spark.table("random1").show()
+---+----+
|age|word|
+---+----+
| 1| foo|
| 2| bar|
+---+----+
Delta 表拒绝了附加操作,因为 DataFrame 包含一行负年龄的数据,这违反了约束。请注意,没有附加任何数据。如果有任何值不满足约束,则不会附加任何值。Delta Lake 支持 ACID 保证,这意味着所有数据都将被附加或所有数据都不会被附加。这些保证在生产数据工作负载中至关重要。
如果您想将此数据添加到 Delta 表中,则必须在附加之前筛选掉不满足约束的值。
df.filter(F.col("age") >= 0).write.format("delta").mode("append").saveAsTable("random1")
spark.table("random1").show()
+---+----+
|age|word|
+---+----+
| 4|blue|
| 1| foo|
| 2| bar|
+---+----+
让我们看看另一种类型的约束,它通常对您的 Delta 表很有用。
Delta Lake 非空约束
您可以向 Delta 表列添加 NOT NULL
约束,以防止附加空数据。此约束通常是可取的,因为您的 Delta 表中通常会有一些列,您不希望它们包含任何 NULL
数据。
创建一个名为 letters
的 Delta 表,其中包含 letter1
和 letter2
列。
spark.sql("""
CREATE TABLE default.letters (
letter1 STRING,
letter2 STRING NOT NULL
) USING DELTA;
""")
附加一些不包含任何 NULL 值的数据,并验证可以附加良好数据。
df = spark.createDataFrame(
[
("a", "aa"),
("b", "bb"),
],
["letter1", "letter2"],
)
df.write.format("delta").mode("append").saveAsTable("letters")
spark.table("letters").show()
+-------+-------+
|letter1|letter2|
+-------+-------+
| b| bb|
| a| aa|
+-------+-------+
尝试附加一些在 letter2
列中包含 NULL
的数据,并观察错误消息。
df = spark.createDataFrame(
[
("c", None),
("d", "dd"),
],
["letter1", "letter2"],
)
df.write.format("delta").mode("append").saveAsTable("letters")
这是引发的错误消息:
org.apache.spark.sql.delta.schema.DeltaInvariantViolationException: NOT NULL constraint violated for column: letter2.
Delta Lake 使您可以轻松地防止将 NULL
值添加到列中。
Delta Lake 非空约束与 DataFrame 可空属性
PySpark DataFrame 具有指定每个列的名称、数据类型和可空属性的模式。可空属性决定了每个列是否可以采用空值。
可空 DataFrame 属性不提供 NOT NULL
约束提供的相同保证。例如,当读取包含空值的 CSV 文件时,PySpark 会自动将 nullable
属性设置为 True
,即使您尝试将其显式设置为 False
。
假设您有一个包含以下数据的 CSV 文件:
col1,col2
hi,bye
hola,
将此 CSV 文件读取到 DataFrame 中,并尝试将 col2
可空属性设置为 False
from pyspark.sql.types import StringType, StructType
schema = StructType().add("col1", StringType(), True).add("col2", StringType(), False)
df = (
spark.read.format("csv")
.option("header", True)
.schema(schema)
.load("../../data/small_file.csv")
)
查看 DataFrame 的内容
df.show()
+----+----+
|col1|col2|
+----+----+
| hi| bye|
|hola|null|
+----+----+
现在查看 DataFrame 的模式
df.printSchema()
root
|-- col1: string (nullable = true)
|-- col2: string (nullable = true)
col2
的可空属性设置为 True
,尽管我们在读取数据时尝试将其显式设置为 False
。
此博客文章不涵盖所有可空属性的边界情况。就本文而言,您只需要了解可空属性不提供 NOT NULL
约束提供的保证。
NOT NULL
约束可防止包含空值的数据附加到您的 Delta 表中。
向现有 Delta 表添加约束
让我们看看当您向已经包含数据的 Delta 表添加检查时会发生什么。
创建一个名为 random2
的 Delta 表,其中包含 age
和 word
列。
df = spark.createDataFrame(
[
(-45, "hi"),
(2, "bye"),
],
["age", "word"],
)
df.write.format("delta").saveAsTable("random2")
查看 random2
表的内容。
spark.table("random2").show()
+---+----+
|age|word|
+---+----+
| 2| bye|
|-45| hi|
+---+----+
其中一行具有负 age
值,另一行具有正 age
值。让我们看看如果您尝试添加一个约束,要求 Delta 表中的所有数据行都具有正 age
值,会发生什么。
spark.sql("ALTER TABLE default.random2 ADD CONSTRAINT ageIsPositive CHECK (age >= 0)")
此命令将报错,并显示以下消息:
AnalysisException: 1 rows in default.random2 violate the new CHECK constraint (age >= 0)
如果现有表不满足布尔检查,Delta Lake 不允许您应用约束。
结论
约束对于确保不良数据不会添加到您的 Delta 表中至关重要。有时您会希望 Delta 表能够摄取所有数据(干净和垃圾),但其他时候您会希望 Delta 表严格,只接受干净的数据。
Delta Lake 约束使您可以在数据附加到 Delta 表之前轻松验证数据。每当您想阻止特定类型的数据添加到列时,请务必使用 Delta Lake 约束。
请记住,Delta Lake 约束适用于行级值。列名称和数据类型的模式级检查通过模式强制单独执行。