The Linux Foundation Projects
Delta Lake

Python deltalake 0.12.0 版本的新功能

作者:Ion Koutsouris

Python deltalake 0.12.0 版本是一个里程碑式的版本,它引入了一系列强大的 Delta 操作,例如 MERGE(upserts)、DELETE 和 UPDATE。此外,此版本还带来了显著的性能增强,将读取性能提高了多达 10 倍。同时,它还显著改善了写入时与大型 Arrow 数据类型的互操作性。

MERGE 操作:轻松集成数据

数据合并是 Delta Lake 提供的一种最通用的操作。它可以同时处理更新、删除和插入,使其成为数据工程师和数据科学家的一项关键工具。一些用例包括处理缓慢变化的维度 (SCD)、仅处理更改的数据 (CDC)、写入时处理去重以及各种其他情况。让我们通过一个真实世界的例子来探索 MERGE 的功能。

轻松实现 Upserts

想象一下,你有一个源表或 DataFrame,并且你想要使用 .merge()when_matched_update()when_not_matched_upsert() 将其数据 upsert 到目标表。让我们考虑一个场景,我们有 SalesOrder 表的初始加载。

from deltalake import DeltaTable, write_deltalake
from datetime import datetime
import polars as pl

df = pl.DataFrame(
    {
        "sales_order_id": ["1000", "1001", "1002", "1003"],
        "product": ["bike", "scooter", "car", "motorcycle"],
        "order_date": [
            datetime(2023, 1, 1),
            datetime(2023, 1, 5),
            datetime(2023, 1, 10),
            datetime(2023, 2, 1),
        ],
        "sales_price": [120.25, 2400, 32000, 9000],
        "paid_by_customer": [True, False, False, True],
    }
)
print(df)

df.write_delta("sales_orders", mode="append")
    ┌────────────────┬────────────┬─────────────────────┬─────────────┬──────────────────┐
    │ sales_order_id ┆ product    ┆ order_date          ┆ sales_price ┆ paid_by_customer │
    │ ---            ┆ ---        ┆ ---                 ┆ ---         ┆ ---              │
    │ str            ┆ str        ┆ datetime[μs]        ┆ f64         ┆ bool             │
    ╞════════════════╪════════════╪═════════════════════╪═════════════╪══════════════════╡
    │ 1000           ┆ bike       ┆ 2023-01-01 00:00:00 ┆ 120.25      ┆ true             │
    │ 1001           ┆ scooter    ┆ 2023-01-05 00:00:00 ┆ 2400.0      ┆ false            │
    │ 1002           ┆ car        ┆ 2023-01-10 00:00:00 ┆ 32000.0     ┆ false            │
    │ 1003           ┆ motorcycle ┆ 2023-02-01 00:00:00 ┆ 9000.0      ┆ true             │
    └────────────────┴────────────┴─────────────────────┴─────────────┴──────────────────┘

上面的代码演示了如何将初始数据写入 Delta 表。现在,假设我们收到一批新的更改数据,其中包括更新,因为一些客户已经支付了订单,并且我们已经调整了某些产品的价格。我们还进行了一笔新的汽车销售。

new_data = pl.DataFrame(
    {
        "sales_order_id": ["1002", "1004"],
        "product": ["car", "car"],
        "order_date": [datetime(2023, 1, 10), datetime(2023, 2, 5)],
        "sales_price": [30000.0, 40000.0],
        "paid_by_customer": [True, True],
    }
)

要执行 upsert,我们将销售订单表加载为 DeltaTable 对象,并使用 .merge() 操作来指定源数据和匹配记录的谓词。然后使用 when_matched_update_all()when_not_matched_insert_all() 函数来指定如何处理所有列的更新和插入。最后,使用 execute() 执行操作。

你可以选择提供 source_aliastarget_aliaserror_on_type_mismatch 参数。默认情况下,如果存在类型不匹配,合并操作将报错。

from polars.io.delta import _convert_pa_schema_to_delta

dt = DeltaTable("sales_orders")
source = new_data.to_arrow()
delta_schema = _convert_pa_schema_to_delta(source.schema)
source = source.cast(delta_schema)

(
    dt.merge(
        source=source,
        predicate="s.sales_order_id = t.sales_order_id",
        source_alias="s",
        target_alias="t",
    )
    .when_matched_update_all()
    .when_not_matched_insert_all()
    .execute()
)
{'num_source_rows': 2,
    'num_target_rows_inserted': 1,
    'num_target_rows_updated': 1,
    'num_target_rows_deleted': 0,
    'num_target_rows_copied': 3,
    'num_output_rows': 5,
    'num_target_files_added': 1,
    'num_target_files_removed': 1,
    'execution_time_ms': 51,
    'scan_time_ms': 0,
    'rewrite_time_ms': 45}

我们可以在指标中看到,一个 target_row 被插入,一个被更新。如果我们使用 Polars 读取 delta 表,我们可以看到 order_id 1002 的 sales_price 和 paid_by_customer 字段已更新,并且我们添加了一个新的销售,order_id 为 1004。

print(pl.read_delta("sales_orders"))
┌────────────────┬────────────┬─────────────────────┬─────────────┬──────────────────┐
│ sales_order_id ┆ product    ┆ order_date          ┆ sales_price ┆ paid_by_customer │
│ ---            ┆ ---        ┆ ---                 ┆ ---         ┆ ---              │
│ str            ┆ str        ┆ datetime[μs]        ┆ f64         ┆ bool             │
╞════════════════╪════════════╪═════════════════════╪═════════════╪══════════════════╡
│ 1002           ┆ car        ┆ 2023-01-10 00:00:00 ┆ 30000.0     ┆ true             │
│ 1000           ┆ bike       ┆ 2023-01-01 00:00:00 ┆ 120.25      ┆ true             │
│ 1001           ┆ scooter    ┆ 2023-01-05 00:00:00 ┆ 2400.0      ┆ false            │
│ 1003           ┆ motorcycle ┆ 2023-02-01 00:00:00 ┆ 9000.0      ┆ true             │
│ 1004           ┆ car        ┆ 2023-02-05 00:00:00 ┆ 40000.0     ┆ true             │
└────────────────┴────────────┴─────────────────────┴─────────────┴──────────────────┘

这个真实世界的例子说明了 Delta-RS 中的 MERGE 操作如何简化数据集成,使你能够无缝处理更改、更新和插入。有关 Merge 允许你执行的更多示例,请查看Delta Lake Merge 博客文章。

UPDATE 操作:微调数据

有时,你只需要根据谓词对数据进行特定更新。在这种情况下,.update() 操作就派上用场了。让我们考虑一个简单的例子,我们需要更正“product”列中的拼写错误。

df = pl.DataFrame({"id": [1, 2, 3], "product": ["appl", "apple", "kiwi"]})
df.write_delta("update_test", mode="overwrite")

数据就位后,我们加载 Delta 表并使用 .update() 操作来修复满足谓词的“product”列中的拼写错误。很快,你将能够将 Python 对象作为输入传递.update() 操作,使其更加灵活。

dt = DeltaTable("update_test")

dt.update(updates={"product": "'apple'"}, predicate="product = 'appl'")
print(pl.read_delta("update_test"))
┌─────┬─────────┐
│ id  ┆ product │
│ --- ┆ ---     │
│ i64 ┆ str     │
╞═════╪═════════╡
│ 1   ┆ apple   │
│ 2   ┆ apple   │
│ 3   ┆ kiwi    │
└─────┴─────────┘

结果是我们的 Delta 表中“product”列已更正,展示了 UPDATE 操作如何让你轻松微调数据。

DELETE 操作:管理数据清理

DELETE 操作是数据清理和管理的强大工具。例如,你可以使用它从表中删除软删除的记录。以下是它的工作原理示例。

df = pl.DataFrame({"id": [1, 2, 3], "deleted": [False, True, True]})
df.write_delta("delete_test", mode="overwrite")

加载 Delta 表后,我们使用带有谓词“deleted = True”的 .delete() 操作来删除标记的记录。如果没有提供谓词,则删除所有记录。

dt = DeltaTable("delete_test")

dt.delete(predicate="deleted = True")
print(pl.read_delta("delete_test"))
┌─────┬─────────┐
│ id  ┆ deleted │
│ --- ┆ ---     │
│ i64 ┆ bool    │
╞═════╪═════════╡
│ 1   ┆ false   │
└─────┴─────────┘

结果是一个清理后的 Delta 表,只包含未删除的记录,说明 DELETE 操作是数据管理的重要操作。

增强与大型 Arrow 数据的互操作性

在此版本中,Delta-RS 引入了与大型 Arrow 数据类型的增强互操作性。现在,你只需启用 large_dtypes=True,即可使用 write_deltalake 直接对包含大型 Arrow 类型的 Arrow 表进行连续写入。

df = pl.DataFrame({"id": [1, 2, 3], "deleted": [False, True, False]})
source = df.to_arrow()
print(f"schema: {source.schema}\n")
write_deltalake("sample_table", source, large_dtypes=True, mode="overwrite")

dt = DeltaTable("sample_table")
print(f"history: {dt.history()[0]}")
schema:
    id: int64
    deleted: bool

history: {'timestamp': 1697974132175, 'operation': 'WRITE', 'operationParameters': {'mode': 'Overwrite', 'partitionBy': '[]'}, 'clientVersion': 'delta-rs.0.17.0', 'version': 7}

改进的互操作性实现了与 Polars 等库更顺畅的集成,你可以在其中使用大型数据类型而无需转换。

结论:弥合数据工作负载的鸿沟

Delta-RS 的 0.12.0 Python 版本直接为 Python 用户带来了许多强大的操作,例如 MERGE、DELETE 和 UPDATE。这使得在非 JVM 工作负载中工作的数据科学家和数据工程师能够构建和增强 Lakehouse 解决方案。它弥合了通常使用 Polars、DuckDB 或 Pandas 等工具处理的中小型数据工作负载与使用 Spark 管理的大数据工作负载之间的鸿沟。

衷心感谢 Delta-RS 的出色贡献者,为构建 Rust API 付出了大量努力!

Delta-RS 的未来展望

  • 添加对表功能(读取支持 V3、写入支持 V7)的支持并改进整体协议支持
  • 在 Delta-RS 中集成 Rust Delta 内核
  • 在 MERGE 中使用逻辑计划以提高性能
  • Python MERGE 中的多个“when”子句
  • 向 Python 公开 FSCK(修复)操作
  • 向事务日志添加 VACUUM