Delta Lake 模式强制
这篇文章将教你 Delta Lake 模式强制,并演示它如何保护你免受将具有不兼容模式的文件添加到 Delta 表中的困扰。
Parquet 表不支持内置模式强制,因此它们接受任何模式的数据,这不一定可取。意外地将 Parquet 文件写入数据湖可能会出人意料地难以撤消。
数据湖(例如 Parquet 表)是读时模式 (schema-on-read),这意味着执行引擎在运行查询时需要确定模式。数据仓库是写时模式 (schema-on-write),这意味着它们在写入数据时检查模式。Delta Lake 提供了数据湖的灵活性,并且也是写时模式,提供了数据仓库的安全性和保证。Delta Lake 模式强制是提供给用户的出色写时模式优势。
让我们首先看看没有关联 Hive 元数据存储条目的 Parquet 表如何不阻止你附加具有不同模式的数据,这可能很危险。我们稍后将讨论存储在 Hive 元数据存储中的 Parquet 表。
Parquet 表没有模式强制
本节演示了 Parquet 表如何没有内置模式强制,因此你可能会错误地将具有不同模式的数据附加到 Parquet 表中。
首先创建一个包含 first_name
和 age
列的 DataFrame,并将其写入 Parquet 表。
columns = ["first_name", "age"]
data = [("bob", 47), ("li", 23), ("leonard", 51)]
rdd = spark.sparkContext.parallelize(data)
df = rdd.toDF(columns)
df.write.format("parquet").save("tmp/parquet_table1")
现在,让我们将一个具有不同模式的 DataFrame 附加到 Parquet 表中。创建一个包含 first_name
和 favorite_color
列的 DataFrame(与之前不同的模式),并将其附加到现有的 Parquet 表中。
columns = ["first_name", "favorite_color"]
data = [("sal", "red"), ("cat", "pink")]
rdd = spark.sparkContext.parallelize(data)
df = rdd.toDF(columns)
df.write.mode("append").format("parquet").save("tmp/parquet_table1")
PySpark 允许你将具有不同模式的 DataFrame 附加到 Parquet 表中。单个 Parquet 表中的不同模式将发生冲突,读取器将来必须解决该冲突。如果默认情况下写入数据时模式不匹配,查询引擎抛出错误会更好。
不幸的是,PySpark 在处理 Parquet 表时无法运行此预写检查,因为查找底层 Parquet 表的模式将涉及单独检查所有文件,这对于大型 Parquet 表来说会很慢。
将 Parquet 表读取到 DataFrame 中并检查其内容。
spark.read.format("parquet").load("tmp/parquet_table1").show()
+----------+----+
|first_name| age|
+----------+----+
| leonard| 51|
| cat|null|
| sal|null|
| bob| 47|
| li| 23|
+----------+----+
这不是一个很好的结果。PySpark 在读取 Parquet 文件时遇到了两个不同的模式,并且只显示了其中一个。你需要手动将 mergeSchema
设置为 true
,才能在读取数据时看到所有数据。
spark.read.option("mergeSchema", "true").format("parquet").load(
"tmp/parquet_table1"
).show()
+----------+----+--------------+
|first_name| age|favorite_color|
+----------+----+--------------+
| leonard| 51| null|
| cat|null| pink|
| sal|null| red|
| bob| 47| null|
| li| 23| null|
+----------+----+--------------+
同样,这不是 PySpark 的错。考虑到 Parquet 表的读时模式限制,PySpark 正在提供尽可能最好的默认行为。
让我们看看 Delta Lake 如何支持模式强制并提供更好的开箱即用默认行为。
Delta Lake 模式强制是内置的
让我们执行相同的操作,但在 Delta Lake 上执行,看看默认操作与 Parquet 有何不同。首先创建一个 DataFrame 并将其写入 Delta 表。
columns = ["first_name", "age"]
data = [("bob", 47), ("li", 23), ("leonard", 51)]
rdd = spark.sparkContext.parallelize(data)
df = rdd.toDF(columns)
df.write.format("delta").save("tmp/delta_table1")
创建另一个具有不同模式的 DataFrame,并尝试将其附加到现有的 Delta 表中。
columns = ["first_name", "favorite_color"]
data = [("sal", "red"), ("cat", "pink")]
rdd = spark.sparkContext.parallelize(data)
df = rdd.toDF(columns)
df.write.mode("append").format("delta").save("tmp/delta_table1")
这是你会得到的错误
AnalysisException: A schema mismatch detected when writing to the Delta table (Table ID: 1f0df7a5-dda6-494f-99bc-4732d455db0b).
To enable schema migration using DataFrameWriter or DataStreamWriter, please set:
'.option("mergeSchema", "true")'.
For other operations, set the session configuration
spark.databricks.delta.schema.autoMerge.enabled to "true". See the documentation
specific to the operation for details.
Table schema:
root
-- first_name: string (nullable = true)
-- age: long (nullable = true)
Data schema:
root
-- first_name: string (nullable = true)
-- favorite_color: string (nullable = true)
错误消息提供了操作失败的描述性解释,以及两种不同的方法来启用写入具有不匹配模式的数据。
Delta Lake 默认的模式强制行为是可取的。你不想默认允许将具有不匹配模式的数据添加到 Delta 表中。你只希望在用户明确声明他们想要的情况下才允许模式不匹配。
Delta Lake 写入时将 mergeSchema 设置为 true
你可以通过显式地将 mergeSchema
设置为 true,将具有不同模式的 DataFrame 附加到 Delta 表中。
df.write.option("mergeSchema", "true").mode("append").format("delta").save(
"tmp/delta_table1"
)
读取 Delta 表并检查内容
spark.read.format("delta").load("tmp/delta_table1").show()
+----------+----+--------------+
|first_name| age|favorite_color|
+----------+----+--------------+
| leonard| 51| null|
| cat|null| pink|
| sal|null| red|
| bob| 47| null|
| li| 23| null|
+----------+----+--------------+
请注意,在读取 Delta 表时,你不需要像读取 Parquet 表时那样将 mergeSchema
设置为 true
。当读取包含不同模式文件的 Parquet 表时,你需要手动将 mergeSchema
设置为 true
。
Delta Lake 的默认行为要好得多。Parquet 表的读取者需要以某种方式知道,当他们读取包含不同模式文件的 Parquet 表时,他们需要将 mergeSchema
设置为 true
,这是一个不合理的要求。数据专业人员可能会读取数十或数百个 Parquet 表。不能指望他们记住何时需要将 mergeSchema
设置为 true
以及何时不需要。
Delta Lake 默认启用 autoMerge 以合并模式
你还可以设置一个 Spark 属性,默认启用 autoMerge
。设置此属性后,在将具有不同模式的数据写入 Delta 表时,你无需手动将 mergeSchema
设置为 true
。
以下是启用 autoMerge
的方法
spark.conf.set("spark.databricks.delta.schema.autoMerge.enabled", "true")
创建一个具有另一个模式的 DataFrame,并将其写入现有的 Delta 表。
columns = ["first_name", "country"]
data = [("bill", "usa"), ("xi", "china")]
rdd = spark.sparkContext.parallelize(data)
df = rdd.toDF(columns)
df.write.mode("append").format("delta").save("tmp/delta_table1")
检查 Delta 表的内容并验证新数据是否已正确附加。
spark.read.format("delta").load("tmp/delta_table1").show()
+----------+----+--------------+-------+
|first_name| age|favorite_color|country|
+----------+----+--------------+-------+
| leonard| 51| null| null|
| cat|null| pink| null|
| sal|null| red| null|
| bob| 47| null| null|
| bill|null| null| usa|
| xi|null| null| china|
| li| 23| null| null|
+----------+----+--------------+-------+
我们可以看到数据已附加,并且在对 Delta 表执行写入操作时,我们不需要将 mergeSchema
设置为 true
。
但请注意!
仔细查看启用 autoMerge
的属性,并注意它是 Delta Lake 特有的:spark.databricks.delta.schema.autoMerge.enabled
。
此配置属性不影响 Parquet 读取。即使设置了此属性,在读取 Parquet 表时,你仍然需要像以前一样手动将 mergeSchema
设置为 true
。
spark.read.option("mergeSchema", "true").format("parquet").load(
"tmp/parquet_table1"
).show()
这个例子演示了两个重要的概念。让我们分别讨论它们。
Delta Lake 模式强制 vs 模式演进
模式强制是 Delta Lake 的一项功能,它阻止你将具有不同模式的数据附加到表中,除非你明确指定该表应允许写入具有不同模式的数据。Parquet 表不支持模式强制。如果 Parquet 用户希望确保不会写入具有不同模式的数据,他们需要手动实现模式强制业务逻辑作为预写检查。
模式演进是指表随时间适应不同模式的能力,通常通过允许添加额外的列来实现。我们的示例已经演示了模式演进,但这是一个重要的话题,因此我们将在另一篇博客文章中更深入地探讨它。
Delta Lake 模式强制 vs 约束
模式强制是指在将数据附加到现有表时进行模式级别检查。它指的是某些列和数据类型的存在。
Delta Lake 还支持约束,这些约束是在附加数据时进行值级别检查。例如,你可以添加一个约束,阻止你向给定列添加 null
值。
模式强制和约束是相关的,因为它们都在写入数据之前检查数据的质量,但它们是独立的概念。你通常会希望同时应用模式强制和约束。
模式强制边缘情况
本文涵盖了最常见的模式强制情况,但有一些未讨论的边缘情况。有关模式强制边缘情况的更多信息,请参阅这篇博客文章。
为什么 Parquet 表中的错误写入难以修复
在 Parquet 表中,错误写入的撤销出乎意料地困难。假设你对存储在云中的分区 Parquet 表执行了具有不匹配模式的写入操作。写入操作将 500 个文件输出到不同的分区。
识别写入 Parquet 表的 500 个文件并不容易。事实上,当你尝试调试时,你甚至不知道写入了多少文件。
一旦你确定要删除的 500 个文件,你需要确保删除错误数据的脚本不会意外地删除好数据!手动删除数据是危险的。一个放错位置的全局字符串字符可能会导致你意外地清除所有数据。
手动删除数据还会破坏下游 ETL 管道自动化。这是一个危险的操作,可能导致 ETL 中断。
Delta Lake 提供了版本化数据和 ACID 事务,因此你无需执行任何 Parquet 表热修复。如果你犯了错误,可以轻松地将 Delta 表回滚到以前的版本。
Hive 元数据存储中 Parquet 表的模式强制
到目前为止,我们只讨论了没有关联 Hive 元数据存储条目的 Parquet 文件。存储在 Hive 元数据存储中的 Parquet 表具有完全不同的模式强制默认行为。
让我们在 Hive 元数据存储中创建一个 Parquet 表,并观察模式强制的默认值。
columns = ["first_name", "age"]
data = [("bob", 47), ("li", 23), ("leonard", 51)]
rdd = spark.sparkContext.parallelize(data)
df = rdd.toDF(columns)
df.write.format("parquet").saveAsTable("mystery_table")
尝试将一些具有不同模式的数据附加到表中。
columns = ["first_name", "favorite_color"]
data = [("sal", "red"), ("cat", "pink")]
rdd = spark.sparkContext.parallelize(data)
df = rdd.toDF(columns)
df.write.mode("append").format("parquet").saveAsTable("mystery_table")
Parquet 表不允许附加具有不同模式的数据,并抛出此异常
AnalysisException: cannot resolve 'age' given input columns: [first_name, favorite_color]
存储在 Hive 元数据存储中的 Parquet 表只有在你使用表名访问它时才内置模式强制。如果你忘记使用表名而直接使用路径,则可以绕过模式强制并弄乱你的表。Delta Lake 模式强制是不可能绕过的。此外,你无法设置配置属性或使用 mergeSchema
来为 Parquet 表使用安全的模式演进。写入 Parquet 表时会忽略 mergeSchema
。
df.write.option("mergeSchema", "true").mode("append").format("parquet").saveAsTable(
"mystery_table"
)
这是抛出的错误
AnalysisException: cannot resolve 'age' given input columns: [first_name, favorite_color]
Delta Lake 提供的可定制模式强制优于托管 Parquet 表的严格模式强制。
结论
Delta Lake 内置模式强制,这是保护数据表质量的好方法。
没有 Hive 元数据存储信息的 Parquet 表没有内置模式强制,不允许良好的合并模式默认值,并且如果写入操作出错则难以修复。存储在 Hive 元数据存储中的 Parquet 表允许模式强制,但它很严格,不如 Delta Lake 提供的模式强制灵活。
模式强制是 Delta Lake 相对于 Parquet 表提供的众多优势之一。你可以轻松地从 Parquet 转换为 Delta Lake,并利用这些功能来处理你的工作负载。