The Linux Foundation Projects
Delta Lake

Delta Lake 模式演进

作者:Matthew Powers

本文将解释如何配置 Delta Lake 以允许模式随时间演进。你将了解模式演进的好处、何时启用此功能以及何时避免使用此功能。

你还将了解 Delta Lake 提供的模式演进为何优于数据湖中支持的模式演进。

此笔记本包含本博客文章中使用的所有代码片段,你可以跟着操作。

Delta Lake 模式演进示例

让我们首先创建一个 DataFrame,然后通过附加具有不同模式的数据来演示模式演进功能。

首先创建一个包含 first_nameage 列的 Delta 表

df = spark.createDataFrame([("bob", 47), ("li", 23), ("leonard", 51)]).toDF(
    "first_name", "age"
)

df.write.format("delta").save("tmp/fun_people")

现在尝试将具有不同模式的 DataFrame 附加到现有的 Delta 表。此 DataFrame 将包含 first_nameagecountry 列。

df = spark.createDataFrame([("frank", 68, "usa"), ("jordana", 26, "brasil")]).toDF(
    "first_name", "age", "country"
)

df.write.format("delta").mode("append").save("tmp/fun_people")

此代码会因 AnalysisException 而出错。Delta Lake 默认不允许你附加具有不匹配模式的数据。此功能称为模式强制。阅读这篇博客文章以了解更多关于 Delta Lake 模式强制的信息。

让我们看看一种绕过模式强制并利用模式演进灵活性的方法。

当 mergeSchema 设置为 true 时的 Delta Lake 模式演进

你可以将 mergeSchema 选项设置为 true,以写入 Delta 表并允许附加具有不匹配模式的数据;请参阅以下示例

df.write.option("mergeSchema", "true").mode("append").format("delta").save(
    "tmp/fun_people"
)

这是添加数据后 Delta 表的内容。

spark.read.format("delta").load("tmp/fun_people").show()

+----------+---+-------+
|first_name|age|country|
+----------+---+-------+
|   jordana| 26| brasil| # new
|     frank| 68|    usa| # new
|   leonard| 51|   null|
|       bob| 47|   null|
|        li| 23|   null|
+----------+---+-------+

Delta 表现在有三列。它以前只有两列。

当添加新列时,现有数据中 country 列中“缺失”的数据简单地标记为 null

每次你想写入不匹配的模式时都将 mergeSchema 设置为 true 可能会很繁琐。让我们看看如何默认启用模式演进。

使用 autoMerge 的 Delta Lake 模式演进

你可以通过将 autoMerge 设置为 true 来默认启用模式演进

spark.conf.set("spark.databricks.delta.schema.autoMerge.enabled", "true")

autoMerge 设置为 true 后,你可以在不设置 mergeSchema 的情况下附加具有不同模式的 DataFrame。让我们向 Delta 表附加一个单列 DataFrame 来演示。

df = spark.createDataFrame([("dahiana",), ("sabrina",)]).toDF("first_name")

df.write.format("delta").mode("append").save("tmp/fun_people")

打印 DataFrame 的内容以确保数据已附加。

spark.read.format("delta").load("tmp/fun_people").show()

+----------+----+-------+
|first_name| age|country|
+----------+----+-------+
|   jordana|  26| brasil|
|     frank|  68|    usa|
|   leonard|  51|   null|
|       bob|  47|   null|
|        li|  23|   null|
|   sabrina|null|   null| # new
|   dahiana|null|   null| # new
+----------+----+-------+

此附加说明了两个概念

  • autoMerge 允许你避免在每次附加数据时显式设置 mergeSchema
  • 模式演进还允许你附加列数少于现有 Delta 表的 DataFrame

让我们创建一个与现有 Delta 表完全不同模式的 DataFrame,看看附加时会发生什么。

创建一个包含 id 列和几行数据的 DataFrame

df = spark.range(0, 3)

df.show()

+---+
| id|
+---+
|  0|
|  1|
|  2|
+---+

将此 DataFrame 附加到 Delta 表

df.write.format("delta").mode("append").save("tmp/fun_people")

查看 Delta 表的内容

spark.read.format("delta").load("tmp/fun_people").show()

+----------+----+-------+----+
|first_name| age|country|  id|
+----------+----+-------+----+
|   jordana|  26| brasil|null|
|     frank|  68|    usa|null|
|   leonard|  51|   null|null|
|       bob|  47|   null|null|
|        li|  23|   null|null|
|   sabrina|null|   null|null|
|   dahiana|null|   null|null|
|      null|null|   null|   1| # new
|      null|null|   null|   2| # new
|      null|null|   null|   0| # new
+----------+----+-------+----+

当启用模式演进时,Delta Lake 甚至会附加这个模式完全不重叠的 DataFrame。如你所见,模式演进非常宽松。

如果你不想允许零模式重叠的附加,你可能需要添加一些“预附加检查”。

为何使用 Delta Lake 模式演进

你应该启用 Delta Lake 模式演进,以允许你的表的模式在不进行完全数据重写的情况下发生更改。

如前面的示例所示,模式演进相当宽松,它允许你将任何模式的 DataFrame 附加到现有的 Delta 表。DataFrame 可以包含额外的列、缺失的列或它们的任意组合。

模式演进通常最适用于你想添加几行或写入没有几行的数据,而不是用于全面的模式更改。此功能提供了很大的灵活性,因此你必须小心使用。

何时避免 Delta Lake 模式演进

模式强制是 Delta Lake 的强大功能,通常是一个明智的默认设置。将数据附加到 Delta 表时,你通常希望新数据的模式与现有表匹配。

如果你需要模式强制保证检查,则不应启用模式演进。启用模式演进时,模式强制检查会禁用。因此,只有在你需要这些检查时才启用它们。

模式演进可能会破坏下游流程。在生产管道中演进模式之前,你应该确保所有下游读取器仍能正常工作。

Delta Lake mergeSchema 与 autoMerge

Delta Lake mergeSchema 仅适用于对单个表的单次写入。如果你只想为单个表启用模式演进,这是一个不错的选择。

Delta Lake 的 autoMerge 选项会为写入任何表的写入激活模式演进。这可能非常方便但也危险。请记住,模式演进相当宽松——它允许你将任何模式的数据附加到任何表而没有任何限制。

只有在你确实需要那种程度的灵活性时才启用 autoMerge。如果你只想为单个作业或单个表启用模式演进,那么 mergeSchema 更安全。

Parquet 表的“模式演进”

Parquet 表不支持模式演进。让我们看看它们默认情况下如何表现。

创建一个包含 citycountry 列的 DataFrame,并将其写入 Parquet 表。

df = spark.createDataFrame([("delhi", "india"), ("baltimore", "usa")]).toDF(
    "city", "country"
)

df.write.format("parquet").mode("append").save("tmp/some_cities")

读取 Parquet 表并查看内容。

spark.read.format("parquet").load("tmp/some_cities").show()

+---------+-------+
|     city|country|
+---------+-------+
|baltimore|    usa|
|    delhi|  india|
+---------+-------+

现在创建一个只包含 id 列的另一个 DataFrame,并将其附加到 Parquet 表。

df = spark.range(0, 3)
df.write.format("parquet").mode("append").save("tmp/some_cities")

请注意,Parquet 表只是简单地接受了附加的具有不匹配模式的数据。Parquet 没有模式强制的概念。任何数据都可以附加到 Parquet 表中。

现在读取 Parquet 表

spark.read.format("parquet").load("tmp/some_cities").show()

+----+
|  id|
+----+
|null|
|null|
|   0|
|   1|
|   2|
+----+

这看起来不对劲!

这也不是你总是会得到的结果。你可能会再次运行代码并得到这个结果

spark.read.format("parquet").load("tmp/some_cities").show()

+---------+-----------+
|     city|    country|
+---------+-----------+
|  toronto|     canada|
|   manila|philippines|
|baltimore|        usa|
|    delhi|      india|
|     null|       null|
|     null|       null|
|     null|       null|
+---------+-----------+

Spark 只是从它遇到的第一个文件抓取模式,并假设它是所有其他文件的模式。在此示例中,它从包含 id 列的文件中抓取模式。它只是忽略了其他文件中的 citycountry 列,因为它认为它们不存在。

在确定模式时,Spark 会有意识地从单个 Parquet 文件中抓取模式。从所有文件中读取模式将是一项昂贵的计算,并会减慢所有读取速度。

你可以在执行读取时通过设置 mergeSchema 选项来强制 Spark 读取所有 Parquet 文件的模式。

spark.read.format("parquet").option("mergeSchema", "true").load(
    "tmp/some_cities"
).show()

+----+---------+-------+
|  id|     city|country|
+----+---------+-------+
|null|baltimore|    usa|
|null|    delhi|  india|
|   0|     null|   null|
|   1|     null|   null|
|   2|     null|   null|
+----+---------+-------+

注意:读取 Parquet 文件时的 mergeSchema 选项与写入 Delta 表时的 mergeSchema 选项完全不同!

当 Spark 在 mergeSchema 设置为 true 的情况下读取 Parquet 文件时,你会得到与读取 Delta 表类似的结果,但这要麻烦得多。

数据专业人员通常会读取多个 Parquet 表,不应该期望他们找出何时需要 mergeSchema。你不想默认启用它,因为那会减慢所有读取速度,即使在不需要时也是如此。将不匹配模式写入 Parquet 数据湖的个人可能会忘记通知所有读取器。这是一种危险的设计模式。

再次强调一个重要说明spark.read.format("parquet").option("mergeSchema", "true") 中的 mergeSchemadf.write.format("delta").option("mergeSchema", "true") 中的 mergeSchema 完全不同。前者用于在读取具有不同模式的 Parquet 数据湖时启用模式解析。后者用于在写入 Delta 表时允许模式演进。不要混淆它们。

Delta Lake 模式演进与数据湖

数据湖是读取时模式(schema on read),这意味着在读取数据时推断模式。如我们所见,在支持具有更改模式的数据时,读取时模式是一个缺点。你需要手动跟踪哪些数据表具有更改模式,或者在读取数据湖时总是将 mergeSchema 设置为 true,这会减慢所有读取速度。

Delta Lake 表是写入时模式(schema on write),这意味着在读取数据时模式已经定义。当附加具有其他模式的数据时,Delta Lake 会知晓。Delta Lake 通过查询事务日志而不是打开所有单个 Parquet 文件来计算表的最终模式。这使得 Delta 表的模式演进速度快,并且对用户来说更方便。

Delta Lake 表比数据湖具有多个优势,模式演进只是众多好处之一。

结论

本文教你如何使用 Delta Lake 启用模式演进以及使用灵活模式管理 Delta 表的好处。

你学习了两种允许模式演进的方法及其权衡。除非你想全局为所有表启用模式演进,否则通常最好使用 mergeSchema 进行模式演进。

你还看到了 Delta Lake 如何提供真正的模式演进。Parquet 表提供了类似于模式演进的功能,但它需要读取时设置,这对用户来说很不方便。它效率也很低,因为它需要在读取发生之前检查所有 Parquet 文件。这是 Delta Lake 比数据湖更强大的众多示例之一。

LinkedIn 上关注我们的作者