The Linux Foundation Projects
Delta Lake

Delta Lake 如何使用元数据使某些聚合更快

作者:Matthew PowersScott Sandre

本篇博客文章解释了 Delta Lake 如何更新以利用元数据,从而使某些聚合基准测试在大型数据集上运行得更快。

这些聚合改进已添加到 Delta Lake 2.2.0 中,因此基准测试将比较 Delta Lake 2.2.0 与 Delta Lake 2.1.1。

我们来看看这些基准测试是如何运行的。

基准测试设置

我们生成了一个包含五列的合成数据集。以下是数据集中的几行:

+------+------------------------------------+----+-----+---+-----+
|id    |uuid                                |year|month|day|value|
+------+------------------------------------+----+-----+---+-----+
|20470 |09797ddf-13ab-4ccd-bb6a-d6b1693e8123|2010|10   |2  |470  |
|148546|864b31b5-f337-4e08-afe3-d72c834efa1f|2002|10   |6  |546  |
|459007|08a60ad9-664a-4609-bf25-2598bef92520|2021|7    |3  |7    |
|248874|1ebee359-4548-4509-9bc0-5e11bcc49336|2010|6    |10 |874  |
|139382|5eee63fe-d559-4002-857b-3251a8e4df80|2012|2    |26 |382  |
+------+------------------------------------+----+-----+---+-----+

我们将此数据集写入了 100、1_000、10_000 和 100_000 个不同的文件,以便为运行基准测试提供不同的场景。请参阅附录中用于生成合成数据集的脚本。

查询是在一台配备 64GB 内存的 Macbook M1 笔记本电脑上运行的。我们来看看结果。

计算 Delta Lake 表中的所有数据行

我们来看看 Delta Lake 2.2+ 如何让以下查询运行得更快:select count(*) from the_table

此查询在 100、1_000、10_000 和 100_000 个文件上针对 Delta Lake 2.1.1 和 Delta Lake 2.2 运行,如下图所示:

对于 Delta Lake 2.1.1 及更早版本,查询运行时随着文件数量的增加而增加。随着 Delta Lake 2.2 中添加的新优化,查询时间不会随着文件数量的增加而增加。

Delta 表中每个文件的记录数包含在 Delta 事务日志中。您无需实际查询数据来计算记录数,只需查阅元数据即可。

Delta Lake 2.2+ 将此查询作为纯元数据操作执行,这就是它能够扩展到更多文件而无需更长时间运行的原因。

我们来看看 Delta Lake 2.2 中添加的另一个查询性能增强功能。

从 Delta 表中选择单行数据

我们来看看 Delta Lake 2.2+ 如何让以下查询运行得更快:select * from the_table limit 1

此查询在 100、1_000、10_000 和 100_000 个文件上针对 Delta Lake 2.1.1 和 Delta Lake 2.2 运行,如下图所示:

Delta Lake 2.1.1 在查询规划期间从未考虑过限制。因此,Delta Lake 生成的物理计划涉及扫描表中的所有数据。然后 Spark 将把 LIMIT 应用到该结果上。

而 Delta Lake 2.2 在查询规划期间确实考虑了限制,并内部使用每个文件的统计信息来确定需要读取的最小数据文件量。有了这个小得多的集合,Spark 可以只读取几个文件,然后将 LIMIT 应用到那个小得多的结果上。请在此处查看实现此功能的提交:https://github.com/delta-io/delta/commit/1a94a585b74477896cbcae203fc26eaca733cbaa

在这个使用 select * from the_table limit 1 的示例中,Spark 只需从单个文件中读取一行。无论 Delta 表中有多少文件,此查询都将能够以同样的速度运行。

我们来看看 Delta Lake 事务日志的内容,以直观了解如何通过利用事务日志中的信息来优化 Delta 表查询。

Delta Lake 事务日志直观理解

我们来看看 Delta 表的基本结构

数据存储在 Parquet 文件中,事务元数据信息存储在事务日志(又名 _delta_log)中。

某些类型的查询可以通过仅查阅事务日志来解决。我们来看看一个具有代表性的事务日志条目:

{
  "commitInfo": {
    "delta-rs": "0.8.0",
    "timestamp": 1679608176735
  }
}
{
  "protocol": {
    "minReaderVersion": 1,
    "minWriterVersion": 1
  }
}
{
  "metaData": {
    "id": "efc48132-57a7-4b93-a0aa-7300bb0bdf94",
    "name": null,
    "description": null,
    "format": {
      "provider": "parquet",
      "options": {}
    },
    "schemaString": "{\"type\":\"struct\",\"fields\":[{\"name\":\"x\",\"type\":\"long\",\"nullable\":true,\"metadata\":{}}]}",
    "partitionColumns": [],
    "createdTime": 1679608176735,
    "configuration": {}
  }
}
{
  "add": {
    "path": "0-66a9cef6-c023-46a4-8714-995cad909191-0.parquet",
    "size": 1654,
    "partitionValues": {},
    "modificationTime": 1679608176735,
    "dataChange": true,
    "stats": "{\"numRecords\": 3, \"minValues\": {\"x\": 1}, \"maxValues\": {\"x\": 3}, \"nullCount\": {\"x\": 0}}",
    "tags": null
  }
}

仔细查看事务日志中包含 numRecords"stats" 部分。您可以通过查看 numRecords 来获取文件中的行数。select count(*) from some_table 查询可以通过简单地汇总给定 Delta 表版本中所有文件的 numRecords 来执行。

结论

这篇博客文章展示了 Spark 如何使用 Delta Lake 事务日志,使某些查询在 Delta 表中文件数量增长时运行得更快。

Delta Lake 是一个不断发展的项目,新功能和性能增强不断被添加。作为用户,您无需更改任何代码即可利用本文介绍的聚合优化。您只需升级您的 Delta Lake 版本即可立即享受这些性能增强带来的好处。

在 LinkedIn 上关注我们,获取有关新版本和 Delta Lake 功能的高质量信息。我们定期发布有用的信息,这将帮助您提升 Delta Lake 技能!

附录:数据生成脚本

我们使用以下代码片段生成了用于此基准分析的合成数据集:

import pyspark.sql.functions as F
from pyspark.sql.types import StringType

def create_table(total_rows, total_files, table_base_path):
    rows_per_commit = 10_000_000
    num_commits = int(total_rows / rows_per_commit)
    num_files_per_commit = int(total_files / num_commits)
    table_path = (
        f"{table_base_path}/{int(total_rows / 1_000_000)}_mil_rows_{total_files}_files"
    )

    for i in range(num_commits):
        (
            spark.range(rows_per_commit)
            .withColumn("uuid", F.expr("uuid()"))
            .withColumn("year", F.col("id") % 22 + 2000)
            .withColumn("month", F.col("id") % 12)
            .withColumn("day", F.col("id") % 28)
            .withColumn("value", (F.col("id") % 1000).cast(StringType()))
            .repartition(num_files_per_commit)
            .write.format("delta")
            .mode("append")
            .save(table_path)
        )

create_table(100_000_000, 100, "/Users/matthew.powers/benchmarks/tables")
create_table(100_000_000, 1000, "/Users/matthew.powers/benchmarks/tables")
create_table(100_000_000, 10000, "/Users/matthew.powers/benchmarks/tables")
create_table(100_000_000, 100000, "/Users/matthew.powers/benchmarks/tables")
LinkedIn 上关注我们的作者