如何从 Delta Lake 表中删除列
这篇文章将教你如何从 Delta Lake 表中删除列。你可能希望删除列以节省存储成本、满足监管要求,或者仅仅为了方便,当某一列包含不再需要的数据时。
有两种不同的方法可以从 Delta Lake 表中删除列。本文将向你展示这两种方法并解释其权衡,以便你可以根据自己的情况选择最佳方法。
它还将让你直观地了解 Delta Lake 是如何实现删除列功能的。了解像删除列这样简单的操作是如何在幕后执行的,是提升你的 Delta Lake 技能的好方法。
Delta Lake 删除列语法
让我们创建一个小型的 Delta 表,然后删除一列。假设你有以下表格。
+--------+------------+
|language|num_speakers|
+--------+------------+
|Mandarin| 1.1|
| English| 1.5|
| Hindi| 0.6|
+--------+------------+
下面是如何从表中删除 language
列。
ALTER TABLE `my_cool_table` DROP COLUMN language
让我们浏览整个代码片段,这样你就可以在本地机器上运行这个例子。如果你还没有安装 PySpark 和 Delta Lake,你还需要在本地安装它们,请参见此处的安装说明。如果你使用的是 Spark 运行时(如 Databricks),这些依赖项已经为你安装好了。
下面是如何使用 Delta 创建 SparkSession(如果你使用的是像 Databricks 这样的 Spark 运行时环境,它会在你每次启动笔记本时自动为你提供 SparkSession,则无需运行此代码)。
import pyspark
from delta import *
builder = (
pyspark.sql.SparkSession.builder.appName("MyApp")
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
.config(
"spark.sql.catalog.spark_catalog",
"org.apache.spark.sql.delta.catalog.DeltaCatalog",
)
)
spark = configure_spark_with_delta_pip(builder).getOrCreate()
一旦 SparkSession 被实例化,就可以轻松地创建一个包含两列三行数据的小表。
columns = ["language", "num_speakers"]
data = [("English", "1.5"), ("Mandarin", "1.1"), ("Hindi", "0.6")]
rdd = spark.sparkContext.parallelize(data)
df = rdd.toDF(columns)
df.write.format("delta").saveAsTable("default.my_cool_table")
spark.sql("select * from `my_cool_table`").show()
+--------+------------+
|language|num_speakers|
+--------+------------+
|Mandarin| 1.1|
| English| 1.5|
| Hindi| 0.6|
+--------+------------+
让我们运行删除列语句,然后输出表以确保数据确实已被删除。我们需要启用列映射模式来执行此操作。
spark.sql(
"""ALTER TABLE `my_cool_table` SET TBLPROPERTIES (
'delta.columnMapping.mode' = 'name',
'delta.minReaderVersion' = '2',
'delta.minWriterVersion' = '5')"""
)
spark.sql("ALTER TABLE `my_cool_table` DROP COLUMN language")
spark.sql("select * from `my_cool_table`").show()
+------------+
|num_speakers|
+------------+
| 1.1|
| 1.5|
| 0.6|
+------------+
如果你想在本地机器上运行这些计算,这里是笔记本的链接。
Delta Lake 删除列实现
DROP COLUMN
是在 Delta Lake 2.0 中添加的,它允许通过元数据操作删除列。当你删除列时,Delta 只需在事务日志中创建一个条目,以指示查询应该忽略该已删除的列。
这是你删除列之前的模式
spark.sql("select * from `my_cool_table`").printSchema()
root
|-- language: string (nullable = true)
|-- num_speakers: string (nullable = true)
这是你删除列时在事务日志中创建的条目
{
"commitInfo":{
"timestamp":1659886555675,
"operation":"DROP COLUMNS",
"operationParameters":{
"columns":"[\"language\"]"
},
"readVersion":1,
"isolationLevel":"Serializable",
"isBlindAppend":true,
"operationMetrics":{
},
"engineInfo":"Apache-Spark/3.2.2 Delta-Lake/2.0.0",
"txnId":"72294000-c6b4-4eba-8cc6-9d207cc01291"
}
}
这是你删除列之后的模式
spark.sql("select * from `my_cool_table`").printSchema()
root
|-- num_speakers: string (nullable = true)
在 Delta Lake 2.0 之前,DROP COLUMN
不可用,因此用户需要实际重写整个数据集来执行此操作。对于大型数据集,删除大型数据集中的列在计算上是昂贵的。
让我们看看 Delta Lake 添加删除列支持之前需要的方法。
Delta Lake 2.0 版本之前删除列
让我们创建另一个例子,说明在添加列映射之前,你需要如何从 Delta Lake 中删除列。
创建另一个名为 another_cool_table
的表,其中包含与之前相同的 language
和 num_speakers
列,以便我们可以演示如何通过完整的数据重写来删除列。
columns = ["language", "num_speakers"]
data = [("Spanish", "0.5"), ("French", "0.3"), ("Arabic", "0.3")]
rdd = spark.sparkContext.parallelize(data)
df = rdd.toDF(columns)
df.write.format("delta").saveAsTable("default.another_cool_table")
确保表已正确创建
df = spark.sql("select * from another_cool_table")
df.show()
+--------+------------+
|language|num_speakers|
+--------+------------+
| Spanish| 0.5|
| Arabic| 0.3|
| French| 0.3|
+--------+------------+
将表读取到 DataFrame 中,删除列,然后将新的 DataFrame 写入 Delta Lake。
df = df.drop("num_speakers")
df.write.format("delta").mode("OVERWRITE").option(
"overwriteSchema", "true"
).saveAsTable("default.another_cool_table")
确认 num_speakers
列已从 Delta Lake 中删除。
spark.sql("select * from another_cool_table").show()
+--------+
|language|
+--------+
| Spanish|
| French|
| Arabic|
+--------+
这种方法有效,但它比简单地通过元数据操作删除列要慢得多。假设你有一个包含 10 TB 数据和 100 列的 Delta Lake,并且你希望删除其中包含 100 GB 数据的列。
启用列映射后,你可以通过向事务日志添加元数据条目来删除此列,这将在几分之一秒内执行。
如果你将所有 10 TB 数据读取到 DataFrame 中,使用 DataFrame API 删除一列,然后重写所有数据,则操作将花费更长时间。简单地删除一列可能是一个大数据处理操作。
让我们更详细地探讨实现细节的权衡,因为它们在某些监管和成本情况下很重要。
Delta Lake 删除列的其他注意事项
有些用户出于监管目的需要从磁盘中物理删除列。他们不能简单地执行元数据操作并忽略该列。数据需要物理删除。
如果你需要从系统中完全删除该列,那么你应该覆盖现有数据,然后执行 vacuum 操作。这是从磁盘中物理删除列的方法。
Delta Lake 将数据存储在 Parquet 文件中,这些文件是列式的,因此磁盘上多余的列不会对查询性能产生不利影响。运行查询时,已删除的列会被简单地忽略。Parquet 的列式特性允许列裁剪,这是 Parquet 比基于行的文件格式(如 CSV)更快的主要原因之一。
你需要为存储通过元数据操作“删除”的列支付额外费用,因为你实际上并没有从 Parquet 文件中删除该列。当列被物理删除时,你无需支付存储费用。正如我们之前讨论的,物理删除列需要计算,这会花费金钱。计算通常比存储更昂贵。计算与存储的权衡超出了本文的范围,但将在未来的文章中更详细地介绍。你通常不需要考虑这些成本,因为它们非常小,但在从高层次思考整体成本管理时值得牢记。
下一步
这篇博文向你展示了如何使用 DROP COLUMN
语法和通过覆盖现有数据湖来从 Delta Lake 表中删除列。
你已经了解了删除表的两种方法之间的权衡。DROP COLUMN
快得多,但它实际上并没有物理删除磁盘上的数据,因此可能不足以满足某些监管要求。一般来说,DROP COLUMN
更可取,但如果出于 GDPR 合规性需要物理删除列,请使用其他方法。
Delta Lake 正在不断改进,对 DROP COLUMN
的改进支持只是一个例子。该项目正在迅速发展,为 Delta Lake 用户添加越来越多的功能。你可以加入我们的活跃 Slack 社区,或者随时在GitHub 仓库上提交问题提供反馈。我们有一个开放、活跃和友好的社区——我们鼓励你加入。