Hive 风格分区的优缺点
这篇博文解释了 Hive 风格分区何时是一种有用的数据管理技术,以及为什么它可能对数据湖和 Lakehouse 存储系统用户带来缺点。
您还将了解分区演变等技术如何克服 Hive 风格分区的一些限制,但仍然存在 Hive 风格分区的根本问题。
Hive 风格分区将数据物理分离到文件夹中。这是一个按国家/地区分区的数据集示例。
country="Angola"/
fileA.parquet
fileB.parquet
country="Colombia"/
fileC.parquet
fileD.parquet
fileE.parquet
在转向缺点之前,让我们看看 Hive 风格分区如何加速数据湖上的查询。
数据湖的 Hive 风格分区
当查询允许跳过某些数据文件时,Hive 风格分区可以更快地在数据湖上运行查询。
假设您要运行以下查询:select count(*) from the_table where country = 'Angola'
。如果数据湖按 country
列分区,此查询将运行得更快。查询引擎只需列出并读取 country='Angola'
目录中的数据文件。它可以跳过其他目录中的数据文件。
引擎需要运行文件列表操作来确定必须为不同查询读取的文件。Hive 风格分区允许查询引擎为某些查询读取更少的文件。
对于像 select count(*) from the_table
这样的查询,Hive 风格分区不允许任何数据跳过,因此查询不会运行得更快。Hive 风格分区实际上可能会使无法利用数据跳过的查询运行得慢得多。
文件列表操作的执行方式取决于底层存储系统。在 Mac OS 等基于 Unix 的文件系统上执行文件列表操作的方式与在 Amazon S3 等基于云的键值存储上执行的方式不同。
在键值对象存储上,全局查找嵌套目录要慢得多。较慢的文件列表操作只是 Hive 风格分区可能导致查询变慢的一种方式。
Hive 风格分区可能会加剧小文件问题
Hive 风格分区也可能使小文件问题更加严重。
如果数据湖包含大量小文件,查询引擎通常会运行得更慢。与 10,000 个每个 0.01 GB 的文件相比,查询在 100 个每个 1 GB 的文件上通常会运行得更快。
Hive 风格分区要求数据存储在单独的文件中,即使它只是一行数据。假设您正在按 columnA 对数据集进行分区,该列有 1,000 个不同的值。这意味着您的数据集必须写入至少 1,000 个不同的文件。
进一步假设您每小时用新数据更新此数据集。这意味着每小时最多创建 1,000 个文件,即每天 24,000 个文件。Hive 风格分区可能导致小文件快速增长。
如果您每小时只摄取 1GB 数据,那么每小时写入多达 1,000 个文件是不明智的。您可以通过定期压缩来修复小文件,但这会浪费计算周期。
数据通常是倾斜的,理想情况是对最常见的列值进行完全分区,但对长尾列值共享分区。Hive 风格分区对于倾斜数据集来说过于严格。
让我们深入了解 Lakehouse 存储系统的架构,以了解它们如何以不同的方式列出文件以及为什么甚至不需要物理磁盘分区。
Lakehouse 存储系统的 Hive 风格分区
Lakehouse 存储系统(如 Delta Lake)将数据存储在 Parquet 文件中,并将有关文件的元数据存储在事务日志中。
在查询 Lakehouse 存储系统时,引擎会在事务日志中查找文件路径——它们不需要执行文件列表操作。避免文件列表操作是 Lakehouse 存储系统相对于数据湖的一个主要优势。
当使用 Lakehouse 存储系统时,引擎不需要物理磁盘分区即可享受磁盘分区带来的数据跳过好处。引擎可以通过查阅事务日志获得所有文件跳过的好处。它不需要全局查找目录来识别哪些文件包含某些分区值。
让我们看看使用 Delta Lake 的 Hive 风格分区表,并解释为什么支持这种数据管理技术。
Delta Lake 表的 Hive 风格分区
让我们创建一个小的分区 Delta 表来演示该功能。首先创建一个包含人员的 DataFrame。
+----------+---------+---------+
|first_name|last_name| country|
+----------+---------+---------+
| Ernesto| Guevara|Argentina|
| Maria|Sharapova| Russia|
| Bruce| Lee| China|
| Jack| Ma| China|
+----------+---------+---------+
现在将此 DataFrame 写入 Hive 风格分区的 Delta 表
(
df.repartition(F.col("country"))
.write.partitionBy("country")
.format("delta")
.saveAsTable("country_people")
)
注意:此处使用 repartition()
是为了示例目的,每个分区写入一个文件。
查看存储中 Delta 表的内容
spark-warehouse/country_people
├── _delta_log
│ └── 00000000000000000000.json
├── country=Argentina
│ └── part-00000-0e188daf-7ed1-4a46-9786-251e5a5b7c61.c000.snappy.parquet
├── country=China
│ └── part-00000-69aeadfb-3692-4765-94bc-f4b271133b35.c000.snappy.parquet
└── country=Russia
└── part-00000-d3a4d532-74f9-4304-970d-b476cf296a07.c000.snappy.parquet
Delta 表由 Parquet 文件组成,数据结构为嵌套目录。事务日志包含有关文件的信息,包括分区结构。
当引擎查询 Delta 表时,它们从事务日志中找出文件位置和分区信息。它们不需要运行文件列表操作或全局查找相关文件。Delta 表的物理分区实际上是不必要的,文件可以简单地进行逻辑分区。Delta Lake 支持物理分区的唯一原因是与支持 Hive 风格分区的其他引擎兼容,并使转换成为可能。
Hive 风格分区的 Parquet 数据湖可以转换为 Delta 表(反之亦然),因为 Delta Lake 支持 Hive 风格分区。
让我们看看 Hive 风格分区的一些其他限制以及如何更好地分离数据。
分区演变以满足不断变化的分区需求
分区演变允许您更改现有表的分区方案,有时被吹捧为解决 Hive 风格分区限制的方案。
如果您想纠正表分区中的错误,例如从按天分区更新为按小时分区,分区演变会很有用。分区演变允许您进行此切换而无需重写数据表。
分区演变允许您纠正选择错误分区键的错误,但它是一种权宜之计,并不能解决根本问题。
数据分离过于严格、小文件和文件列表缓慢(对于数据湖)的问题仍然存在。现在让我们将注意力转向 Z 排序,它更可持续地解决了一些 Hive 风格分区问题。
Z 排序而不是 Hive 风格分区
在 Lakehouse 存储系统中对表运行查询时,您可以根据列级元数据或事务日志中的分区信息跳过文件。
正如我们已经讨论过的,Lakehouse 存储系统不需要 Hive 风格分区,后者使用基于目录结构的文件列表操作来跳过数据。Delta 表还可以根据事务日志中存储的 min/max 列值跳过文件。
对数据进行 Z 排序可以使基于 min/max 文件级元数据的文件跳过更有效。
Z 排序还可以为更广泛的查询模式更好地进行文件跳过。假设您有一个包含 col_a
和 col_b
的表,并且您希望所有以下类型的查询运行得更快
- 查询 A:根据
col_a
进行过滤 - 查询 B:根据
col_b
进行过滤 - 查询 C:同时根据
col_a
和col_b
进行过滤
根据 col_a
和 col_b
对数据集进行 Z 排序将使查询 A、查询 B 和查询 C 运行得更快。如果表按 col_a
分区,则只有查询 A 会运行得更快。
在某些用例中,Z 排序可能比 Hive 风格分区更好,但它也有很多权衡。Z 排序和 Hive 风格分区也不是互斥的——表可以分区,并且每个分区都可以进行 Z 排序。
有关 Z 排序的完整描述,请参阅此帖子。就本次讨论而言,最重要的结论是,用户不需要将数据分离到子文件夹中即可享受文件跳过的好处。Lakehouse 存储系统中的数据跳过也可以基于文件级列 min/max 元数据进行。
现在让我们看看 Hive 风格分区被使用的另一个与性能无关的原因。
并发性的 Hive 风格分区
某些类型的操作可以通过 Hive 风格分区绕过并发问题。
例如,对相同数据的并发更新和删除操作可能会冲突并引发错误。
您可以通过在不重叠的分区上运行更新和删除操作来绕过 Hive 风格分区表上的这些并发错误。例如,这两个命令可以在按 date
分区的 Delta 表上并发运行
UPDATE table WHERE date > '2010-01-01'
DELETE table WHERE date < '2010-01-01'
Hive 风格分区允许用户在某些情况下绕过并发问题,但正如我们之前提到的,将数据分离到子目录中并非严格必要才能实现数据的完全划分。逻辑分区(将数据分离到单独的文件中并将其记录在事务日志中)是足够的,物理分区(将数据分离到单独的文件夹中)并非严格必要。
使用生成列对时间序列数据进行分区
🛑 反模式:按未分箱的 TimestampType
列分区
在使用事务数据/事件时,数据通常包含某种时间戳属性。数据分析通常侧重于最新数据,这就是为什么使用时间戳作为分区列听起来很有前景。这里的问题是,按高基数列(如 TimestampType
列)进行分区肯定会导致过度分区,从而创建大量仅包含少量数据的分区。
from pyspark.sql import functions as F
from pyspark.sql import types as T
from datetime import datetime
schema = T.StructType([
T.StructField('event_id', T.LongType()),
T.StructField('event_timestamp', T.TimestampType()),
T.StructField('event_payload', T.StringType()),
])
data = [
( 1, datetime.fromisoformat("1990-06-15 09:01:01"), "Australia"),
( 2, datetime.fromisoformat("1990-06-15 09:01:02"), "Botswana"),
( 1_000_000, datetime.fromisoformat("1990-12-31 12:34:56"), "Costa Rica"),
(1_000_000_000, datetime.fromisoformat("2000-01-10 12:34:56"), "Denmark"),
]
df = spark.createDataFrame(data, schema)
(
df
.coalesce(1) # only for demonstration purposes, so per partition, one file is written
.write
.format("delta")
.mode("overwrite")
.partitionBy("event_timestamp")
.saveAsTable("events")
)
events
├── _delta_log/
│ └── 00000000000000000000.json
├── event_timestamp=1990-06-15 09:01:01/
│ └── part-00001-77330743-946f-4f6a-830e-37a575d5234f.c000.snappy.parquet (1 rows)
├── event_timestamp=1990-06-15 09:01:02/
│ └── part-00003-d4e51376-087d-45fb-b472-d392c3991dab.c000.snappy.parquet (1 rows)
├── event_timestamp=1990-12-31 12:34:56/
│ └── part-00005-0ca14c69-bdcb-4233-b075-da74bc8b0f97.c000.snappy.parquet (1 rows)
└── event_timestamp=2000-01-10 12:34:56/
└── part-00007-66f59e03-5c5c-4b7f-923b-3059f928e06f.c000.snappy.parquet (1 rows)
如所示,即使事件 1 和 2 几乎同时发生,所有行也将最终位于单独的分区中。
✅ 按分箱的 TimestampType
列分区(使用生成表达式)
一个更好的方法是,将 TimestampType
值“分箱”到更粗粒度,从而降低值的基数(例如,按小时、按天、按年)。这可以通过生成列轻松完成,利用DATE_TRUNC
函数。此处生成列的优点是,在表创建后向表中追加数据时,它将在插入期间自动从引用列计算(并且列不需要包含在追加的 DataFrame
中)。此外,自 Delta 2.3 起,当使用分区列的生成表达式引用的列上的谓词查询表时,也将支持分区裁剪。在执行 SELECT * FROM events WHERE event_timestamp = '1990-06-15 09:01:01'
时,Delta 将告诉引擎最初只读取分区 event_timestamp_bin=1990-01-01
,然后过滤 event_timestamp = '1990-06-15 09:01:01'
在以下示例中。
generation_expression = "DATE_TRUNC('YEAR', event_timestamp)"
(
df
.withColumn("event_timestamp_bin", F.expr(generation_expression)) # generated a new column that contains the desired timestamp granularity
.withMetadata("event_timestamp_bin", {"delta.generationExpression": generation_expression}) # this will tell Delta that this is a generated column
.coalesce(1) # only for demonstration purposes, so per partition, one file is written
.write
.format("delta")
.mode("overwrite")
.partitionBy("event_timestamp_bin")
.saveAsTable("events")
)
events
├── _delta_log/
│ └── 00000000000000000000.json
├── event_timestamp_bin=1990-01-01 00:00:00/
│ └── part-00000-0ba92f13-29ee-410b-8943-298fa8e86f4e.c000.snappy.parquet (3 rows)
└── event_timestamp_bin=2000-01-01 00:00:00/
└── part-00000-57b8e78f-a752-4285-8cab-25be3aa632f4.c000.snappy.parquet (1 rows)
此处可见,三行将存储在同一分区中(不再过度分区)。
查看分区裁剪的实际操作
spark.table("events").filter(F.col("event_timestamp_bin") == '1990-01-01').explain()
== Physical Plan ==
*(1) ColumnarToRow
+- FileScan parquet spark_catalog.delta_blog.events[event_id#4761L,event_timestamp#4762,event_payload#4763,event_timestamp_bin#4764]
Batched: true,
DataFilters: [],
Format: Parquet,
Location: PreparedDeltaFileIndex(1 paths)[dbfs:/user/hive/warehouse/delta_blog.db/events],
PartitionFilters: [isnotnull(event_timestamp_bin#4764), (event_timestamp_bin#4764 = 1990-01-01 00:00:00)],
PushedFilters: [],
ReadSchema: struct<event_id:bigint,event_timestamp:timestamp,event_payload:string>
注意 PartitionFilters
spark.table("events").filter(F.col("event_timestamp") == '1990-06-15 09:01:02').explain()
== Physical Plan ==
*(1) Filter (isnotnull(event_timestamp#4916) AND (event_timestamp#4916 = 1990-06-15 09:01:02))
+- *(1) ColumnarToRow
+- FileScan parquet spark_catalog.delta_blog.events[event_id#4915L,event_timestamp#4916,event_payload#4917,event_timestamp_bin#4918]
Batched: true,
DataFilters: [isnotnull(event_timestamp#4916), (event_timestamp#4916 = 1990-06-15 09:01:02)],
Format: Parquet,
Location: PreparedDeltaFileIndex(1 paths)[dbfs:/user/hive/warehouse/delta_blog.db/events],
PartitionFilters: [((event_timestamp_bin#4918 = date_trunc(MONTH, 1990-06-15 09:01:02, Some(Etc/UTC))) OR isnull((e..., PushedFilters: [IsNotNull(event_timestamp), EqualTo(event_timestamp,1990-06-15 09:01:02.0)],
ReadSchema: struct<event_id:bigint,event_timestamp:timestamp,event_payload:string>
注意 PartitionFilters
🔀 分区演变:更改分区列的分箱粒度
预先估计正确的分区粒度非常困难。这里的经验法则是,每个分区中至少应有 1 GB 的数据。因此,表最终出现过度分区(许多分区只包含少量数据)或欠分区(少量分区包含大量数据)的情况相当常见。随后更改分区的粒度对于 Delta 来说很容易,但代价是重写整个表。
在此示例中,读取 Delta 表(之前按年分箱);转换相应的 DataFrame
(以便分区值按月分箱),并用于简单地就地覆盖 Delta 表。由于 Delta 的版本控制/时间旅行功能,这将创建具有更改生成表达式的表的新版本。因此无需创建临时表并随后交换它。
spark.table("events")
new_generation_expression = "DATE_TRUNC('MONTH', event_timestamp)"
(
spark.table("events")
.withColumn("event_timestamp_bin", F.expr(new_generation_expression))
.withMetadata("event_timestamp_bin", {"delta.generationExpression": new_generation_expression})
.coalesce(1) # only for demonstration purposes, so per partition, one file is written
.write
.format("delta")
.mode("overwrite")
.option("overwriteSchema", "True") # this is required, because we change the generation expression that is considered part of the schema
.partitionBy("event_timestamp_bin")
.saveAsTable("events")
)
events
├── _delta_log/
│ └── 00000000000000000000.json (old generation expression / partitioning scheme)
│ └── 00000000000000000001.json (new generation expression / partitioning scheme)
├── event_timestamp_bin=1990-01-01 00:00:00/
├── event_timestamp_bin=1990-06-01 00:00:00/
│ └── part-00000-cc206daa-ed02-4277-a340-e73b103f1cb3.c000.snappy.parquet (2 rows)
├── event_timestamp_bin=1990-12-01 00:00:00/
│ └── part-00000-886aa276-3211-4c45-8a5a-6d138809b39b.c000.snappy.parquet (1 rows)
└── event_timestamp_bin=2000-01-01 00:00:00/
└── part-00000-70d65a32-e9cd-4503-8822-3fe1a7e36586.c000.snappy.parquet (1 rows)
结论
Hive 风格分区是数据湖的一项重要数据管理技术,因为它允许数据跳过。数据湖没有带有文件级元数据统计信息的事务日志,因此跳过文件的唯一方法是在存储中物理分区数据。
Hive 风格分区允许完全数据分离,受到许多旧版引擎的支持,并且可以帮助用户绕过并发冲突,因此在某些情况下很有用。
然而,Hive 风格分区也有许多缺点。它创建了僵硬的表结构,可能导致创建大量小文件,并且只适用于基数相对较低的列。表通常以不同的方式查询,Hive 风格分区只会使某些查询更快(并且可能使其他查询变慢)。
Delta Lake 支持更好的数据共置方式,提供 Hive 风格分区的好处而没有缺点。