The Linux Foundation Projects
Delta Lake

如何从 Delta Lake 表中删除列

作者:Matthew Powers

这篇文章将教你如何从 Delta Lake 表中删除列。你可能希望删除列以节省存储成本、满足监管要求,或者仅仅为了方便,当某一列包含不再需要的数据时。

有两种不同的方法可以从 Delta Lake 表中删除列。本文将向你展示这两种方法并解释其权衡,以便你可以根据自己的情况选择最佳方法。

它还将让你直观地了解 Delta Lake 是如何实现删除列功能的。了解像删除列这样简单的操作是如何在幕后执行的,是提升你的 Delta Lake 技能的好方法。

Delta Lake 删除列语法

让我们创建一个小型的 Delta 表,然后删除一列。假设你有以下表格。

+--------+------------+
|language|num_speakers|
+--------+------------+
|Mandarin|         1.1|
| English|         1.5|
|   Hindi|         0.6|
+--------+------------+

下面是如何从表中删除 language 列。

ALTER TABLE `my_cool_table` DROP COLUMN language

让我们浏览整个代码片段,这样你就可以在本地机器上运行这个例子。如果你还没有安装 PySpark 和 Delta Lake,你还需要在本地安装它们,请参见此处的安装说明。如果你使用的是 Spark 运行时(如 Databricks),这些依赖项已经为你安装好了。

下面是如何使用 Delta 创建 SparkSession(如果你使用的是像 Databricks 这样的 Spark 运行时环境,它会在你每次启动笔记本时自动为你提供 SparkSession,则无需运行此代码)。

import pyspark
from delta import *

builder = (
    pyspark.sql.SparkSession.builder.appName("MyApp")
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
    .config(
        "spark.sql.catalog.spark_catalog",
        "org.apache.spark.sql.delta.catalog.DeltaCatalog",
    )
)

spark = configure_spark_with_delta_pip(builder).getOrCreate()

一旦 SparkSession 被实例化,就可以轻松地创建一个包含两列三行数据的小表。

columns = ["language", "num_speakers"]
data = [("English", "1.5"), ("Mandarin", "1.1"), ("Hindi", "0.6")]
rdd = spark.sparkContext.parallelize(data)
df = rdd.toDF(columns)

df.write.format("delta").saveAsTable("default.my_cool_table")

spark.sql("select * from `my_cool_table`").show()
+--------+------------+
|language|num_speakers|
+--------+------------+
|Mandarin|         1.1|
| English|         1.5|
|   Hindi|         0.6|
+--------+------------+

让我们运行删除列语句,然后输出表以确保数据确实已被删除。我们需要启用列映射模式来执行此操作。

spark.sql(
    """ALTER TABLE `my_cool_table` SET TBLPROPERTIES (
   'delta.columnMapping.mode' = 'name',
   'delta.minReaderVersion' = '2',
   'delta.minWriterVersion' = '5')"""
)

spark.sql("ALTER TABLE `my_cool_table` DROP COLUMN language")

spark.sql("select * from `my_cool_table`").show()
+------------+
|num_speakers|
+------------+
|         1.1|
|         1.5|
|         0.6|
+------------+

如果你想在本地机器上运行这些计算,这里是笔记本的链接

Delta Lake 删除列实现

DROP COLUMN 是在 Delta Lake 2.0 中添加的,它允许通过元数据操作删除列。当你删除列时,Delta 只需在事务日志中创建一个条目,以指示查询应该忽略该已删除的列。

这是你删除列之前的模式

spark.sql("select * from `my_cool_table`").printSchema()

root
 |-- language: string (nullable = true)
 |-- num_speakers: string (nullable = true)

这是你删除列时在事务日志中创建的条目

{
   "commitInfo":{
      "timestamp":1659886555675,
      "operation":"DROP COLUMNS",
      "operationParameters":{
         "columns":"[\"language\"]"
      },
      "readVersion":1,
      "isolationLevel":"Serializable",
      "isBlindAppend":true,
      "operationMetrics":{

      },
      "engineInfo":"Apache-Spark/3.2.2 Delta-Lake/2.0.0",
      "txnId":"72294000-c6b4-4eba-8cc6-9d207cc01291"
   }
}

这是你删除列之后的模式

spark.sql("select * from `my_cool_table`").printSchema()

root
 |-- num_speakers: string (nullable = true)

在 Delta Lake 2.0 之前,DROP COLUMN 不可用,因此用户需要实际重写整个数据集来执行此操作。对于大型数据集,删除大型数据集中的列在计算上是昂贵的。

让我们看看 Delta Lake 添加删除列支持之前需要的方法。

Delta Lake 2.0 版本之前删除列

让我们创建另一个例子,说明在添加列映射之前,你需要如何从 Delta Lake 中删除列。

创建另一个名为 another_cool_table 的表,其中包含与之前相同的 languagenum_speakers 列,以便我们可以演示如何通过完整的数据重写来删除列。

columns = ["language", "num_speakers"]
data = [("Spanish", "0.5"), ("French", "0.3"), ("Arabic", "0.3")]
rdd = spark.sparkContext.parallelize(data)
df = rdd.toDF(columns)

df.write.format("delta").saveAsTable("default.another_cool_table")

确保表已正确创建

df = spark.sql("select * from another_cool_table")

df.show()

+--------+------------+
|language|num_speakers|
+--------+------------+
| Spanish|         0.5|
|  Arabic|         0.3|
|  French|         0.3|
+--------+------------+

将表读取到 DataFrame 中,删除列,然后将新的 DataFrame 写入 Delta Lake。

df = df.drop("num_speakers")

df.write.format("delta").mode("OVERWRITE").option(
    "overwriteSchema", "true"
).saveAsTable("default.another_cool_table")

确认 num_speakers 列已从 Delta Lake 中删除。

spark.sql("select * from another_cool_table").show()

+--------+
|language|
+--------+
| Spanish|
|  French|
|  Arabic|
+--------+

这种方法有效,但它比简单地通过元数据操作删除列要慢得多。假设你有一个包含 10 TB 数据和 100 列的 Delta Lake,并且你希望删除其中包含 100 GB 数据的列。

启用列映射后,你可以通过向事务日志添加元数据条目来删除此列,这将在几分之一秒内执行。

如果你将所有 10 TB 数据读取到 DataFrame 中,使用 DataFrame API 删除一列,然后重写所有数据,则操作将花费更长时间。简单地删除一列可能是一个大数据处理操作。

让我们更详细地探讨实现细节的权衡,因为它们在某些监管和成本情况下很重要。

Delta Lake 删除列的其他注意事项

有些用户出于监管目的需要从磁盘中物理删除列。他们不能简单地执行元数据操作并忽略该列。数据需要物理删除。

如果你需要从系统中完全删除该列,那么你应该覆盖现有数据,然后执行 vacuum 操作。这是从磁盘中物理删除列的方法。

Delta Lake 将数据存储在 Parquet 文件中,这些文件是列式的,因此磁盘上多余的列不会对查询性能产生不利影响。运行查询时,已删除的列会被简单地忽略。Parquet 的列式特性允许列裁剪,这是 Parquet 比基于行的文件格式(如 CSV)更快的主要原因之一。

你需要为存储通过元数据操作“删除”的列支付额外费用,因为你实际上并没有从 Parquet 文件中删除该列。当列被物理删除时,你无需支付存储费用。正如我们之前讨论的,物理删除列需要计算,这会花费金钱。计算通常比存储更昂贵。计算与存储的权衡超出了本文的范围,但将在未来的文章中更详细地介绍。你通常不需要考虑这些成本,因为它们非常小,但在从高层次思考整体成本管理时值得牢记。

下一步

这篇博文向你展示了如何使用 DROP COLUMN 语法和通过覆盖现有数据湖来从 Delta Lake 表中删除列。

你已经了解了删除表的两种方法之间的权衡。DROP COLUMN 快得多,但它实际上并没有物理删除磁盘上的数据,因此可能不足以满足某些监管要求。一般来说,DROP COLUMN 更可取,但如果出于 GDPR 合规性需要物理删除列,请使用其他方法。

Delta Lake 正在不断改进,对 DROP COLUMN 的改进支持只是一个例子。该项目正在迅速发展,为 Delta Lake 用户添加越来越多的功能。你可以加入我们的活跃 Slack 社区,或者随时在GitHub 仓库上提交问题提供反馈。我们有一个开放、活跃和友好的社区——我们鼓励你加入。

LinkedIn 上关注我们的作者