Python deltalake 0.12.0 版本的新功能
Python deltalake 0.12.0 版本是一个里程碑式的版本,它引入了一系列强大的 Delta 操作,例如 MERGE(upserts)、DELETE 和 UPDATE。此外,此版本还带来了显著的性能增强,将读取性能提高了多达 10 倍。同时,它还显著改善了写入时与大型 Arrow 数据类型的互操作性。
MERGE 操作:轻松集成数据
数据合并是 Delta Lake 提供的一种最通用的操作。它可以同时处理更新、删除和插入,使其成为数据工程师和数据科学家的一项关键工具。一些用例包括处理缓慢变化的维度 (SCD)、仅处理更改的数据 (CDC)、写入时处理去重以及各种其他情况。让我们通过一个真实世界的例子来探索 MERGE 的功能。
轻松实现 Upserts
想象一下,你有一个源表或 DataFrame,并且你想要使用 .merge()
、when_matched_update()
和 when_not_matched_upsert()
将其数据 upsert 到目标表。让我们考虑一个场景,我们有 SalesOrder 表的初始加载。
from deltalake import DeltaTable, write_deltalake
from datetime import datetime
import polars as pl
df = pl.DataFrame(
{
"sales_order_id": ["1000", "1001", "1002", "1003"],
"product": ["bike", "scooter", "car", "motorcycle"],
"order_date": [
datetime(2023, 1, 1),
datetime(2023, 1, 5),
datetime(2023, 1, 10),
datetime(2023, 2, 1),
],
"sales_price": [120.25, 2400, 32000, 9000],
"paid_by_customer": [True, False, False, True],
}
)
print(df)
df.write_delta("sales_orders", mode="append")
┌────────────────┬────────────┬─────────────────────┬─────────────┬──────────────────┐
│ sales_order_id ┆ product ┆ order_date ┆ sales_price ┆ paid_by_customer │
│ --- ┆ --- ┆ --- ┆ --- ┆ --- │
│ str ┆ str ┆ datetime[μs] ┆ f64 ┆ bool │
╞════════════════╪════════════╪═════════════════════╪═════════════╪══════════════════╡
│ 1000 ┆ bike ┆ 2023-01-01 00:00:00 ┆ 120.25 ┆ true │
│ 1001 ┆ scooter ┆ 2023-01-05 00:00:00 ┆ 2400.0 ┆ false │
│ 1002 ┆ car ┆ 2023-01-10 00:00:00 ┆ 32000.0 ┆ false │
│ 1003 ┆ motorcycle ┆ 2023-02-01 00:00:00 ┆ 9000.0 ┆ true │
└────────────────┴────────────┴─────────────────────┴─────────────┴──────────────────┘
上面的代码演示了如何将初始数据写入 Delta 表。现在,假设我们收到一批新的更改数据,其中包括更新,因为一些客户已经支付了订单,并且我们已经调整了某些产品的价格。我们还进行了一笔新的汽车销售。
new_data = pl.DataFrame(
{
"sales_order_id": ["1002", "1004"],
"product": ["car", "car"],
"order_date": [datetime(2023, 1, 10), datetime(2023, 2, 5)],
"sales_price": [30000.0, 40000.0],
"paid_by_customer": [True, True],
}
)
要执行 upsert,我们将销售订单表加载为 DeltaTable 对象,并使用 .merge()
操作来指定源数据和匹配记录的谓词。然后使用 when_matched_update_all()
和 when_not_matched_insert_all()
函数来指定如何处理所有列的更新和插入。最后,使用 execute()
执行操作。
你可以选择提供 source_alias
、target_alias
和 error_on_type_mismatch
参数。默认情况下,如果存在类型不匹配,合并操作将报错。
from polars.io.delta import _convert_pa_schema_to_delta
dt = DeltaTable("sales_orders")
source = new_data.to_arrow()
delta_schema = _convert_pa_schema_to_delta(source.schema)
source = source.cast(delta_schema)
(
dt.merge(
source=source,
predicate="s.sales_order_id = t.sales_order_id",
source_alias="s",
target_alias="t",
)
.when_matched_update_all()
.when_not_matched_insert_all()
.execute()
)
{'num_source_rows': 2,
'num_target_rows_inserted': 1,
'num_target_rows_updated': 1,
'num_target_rows_deleted': 0,
'num_target_rows_copied': 3,
'num_output_rows': 5,
'num_target_files_added': 1,
'num_target_files_removed': 1,
'execution_time_ms': 51,
'scan_time_ms': 0,
'rewrite_time_ms': 45}
我们可以在指标中看到,一个 target_row 被插入,一个被更新。如果我们使用 Polars 读取 delta 表,我们可以看到 order_id 1002 的 sales_price 和 paid_by_customer 字段已更新,并且我们添加了一个新的销售,order_id 为 1004。
print(pl.read_delta("sales_orders"))
┌────────────────┬────────────┬─────────────────────┬─────────────┬──────────────────┐
│ sales_order_id ┆ product ┆ order_date ┆ sales_price ┆ paid_by_customer │
│ --- ┆ --- ┆ --- ┆ --- ┆ --- │
│ str ┆ str ┆ datetime[μs] ┆ f64 ┆ bool │
╞════════════════╪════════════╪═════════════════════╪═════════════╪══════════════════╡
│ 1002 ┆ car ┆ 2023-01-10 00:00:00 ┆ 30000.0 ┆ true │
│ 1000 ┆ bike ┆ 2023-01-01 00:00:00 ┆ 120.25 ┆ true │
│ 1001 ┆ scooter ┆ 2023-01-05 00:00:00 ┆ 2400.0 ┆ false │
│ 1003 ┆ motorcycle ┆ 2023-02-01 00:00:00 ┆ 9000.0 ┆ true │
│ 1004 ┆ car ┆ 2023-02-05 00:00:00 ┆ 40000.0 ┆ true │
└────────────────┴────────────┴─────────────────────┴─────────────┴──────────────────┘
这个真实世界的例子说明了 Delta-RS 中的 MERGE 操作如何简化数据集成,使你能够无缝处理更改、更新和插入。有关 Merge 允许你执行的更多示例,请查看Delta Lake Merge 博客文章。
UPDATE 操作:微调数据
有时,你只需要根据谓词对数据进行特定更新。在这种情况下,.update()
操作就派上用场了。让我们考虑一个简单的例子,我们需要更正“product”列中的拼写错误。
df = pl.DataFrame({"id": [1, 2, 3], "product": ["appl", "apple", "kiwi"]})
df.write_delta("update_test", mode="overwrite")
数据就位后,我们加载 Delta 表并使用 .update() 操作来修复满足谓词的“product”列中的拼写错误。很快,你将能够将 Python 对象作为输入传递给 .update()
操作,使其更加灵活。
dt = DeltaTable("update_test")
dt.update(updates={"product": "'apple'"}, predicate="product = 'appl'")
print(pl.read_delta("update_test"))
┌─────┬─────────┐
│ id ┆ product │
│ --- ┆ --- │
│ i64 ┆ str │
╞═════╪═════════╡
│ 1 ┆ apple │
│ 2 ┆ apple │
│ 3 ┆ kiwi │
└─────┴─────────┘
结果是我们的 Delta 表中“product”列已更正,展示了 UPDATE 操作如何让你轻松微调数据。
DELETE 操作:管理数据清理
DELETE 操作是数据清理和管理的强大工具。例如,你可以使用它从表中删除软删除的记录。以下是它的工作原理示例。
df = pl.DataFrame({"id": [1, 2, 3], "deleted": [False, True, True]})
df.write_delta("delete_test", mode="overwrite")
加载 Delta 表后,我们使用带有谓词“deleted = True”的 .delete()
操作来删除标记的记录。如果没有提供谓词,则删除所有记录。
dt = DeltaTable("delete_test")
dt.delete(predicate="deleted = True")
print(pl.read_delta("delete_test"))
┌─────┬─────────┐
│ id ┆ deleted │
│ --- ┆ --- │
│ i64 ┆ bool │
╞═════╪═════════╡
│ 1 ┆ false │
└─────┴─────────┘
结果是一个清理后的 Delta 表,只包含未删除的记录,说明 DELETE 操作是数据管理的重要操作。
增强与大型 Arrow 数据的互操作性
在此版本中,Delta-RS 引入了与大型 Arrow 数据类型的增强互操作性。现在,你只需启用 large_dtypes=True
,即可使用 write_deltalake
直接对包含大型 Arrow 类型的 Arrow 表进行连续写入。
df = pl.DataFrame({"id": [1, 2, 3], "deleted": [False, True, False]})
source = df.to_arrow()
print(f"schema: {source.schema}\n")
write_deltalake("sample_table", source, large_dtypes=True, mode="overwrite")
dt = DeltaTable("sample_table")
print(f"history: {dt.history()[0]}")
schema:
id: int64
deleted: bool
history: {'timestamp': 1697974132175, 'operation': 'WRITE', 'operationParameters': {'mode': 'Overwrite', 'partitionBy': '[]'}, 'clientVersion': 'delta-rs.0.17.0', 'version': 7}
改进的互操作性实现了与 Polars 等库更顺畅的集成,你可以在其中使用大型数据类型而无需转换。
结论:弥合数据工作负载的鸿沟
Delta-RS 的 0.12.0 Python 版本直接为 Python 用户带来了许多强大的操作,例如 MERGE、DELETE 和 UPDATE。这使得在非 JVM 工作负载中工作的数据科学家和数据工程师能够构建和增强 Lakehouse 解决方案。它弥合了通常使用 Polars、DuckDB 或 Pandas 等工具处理的中小型数据工作负载与使用 Spark 管理的大数据工作负载之间的鸿沟。
衷心感谢 Delta-RS 的出色贡献者,为构建 Rust API 付出了大量努力!
Delta-RS 的未来展望
- 添加对表功能(读取支持 V3、写入支持 V7)的支持并改进整体协议支持
- 在 Delta-RS 中集成 Rust Delta 内核
- 在 MERGE 中使用逻辑计划以提高性能
- Python MERGE 中的多个“when”子句
- 向 Python 公开 FSCK(修复)操作
- 向事务日志添加 VACUUM