在 Delta Lake 表上将 Ibis 与 PySpark 结合使用
这篇博文解释了如何使用 Ibis 和 PySpark 后端查询 Delta Lake。
Ibis 允许您使用各种引擎(如 pandas、DuckDB 和 Spark)以相同的语法查询数据。使用 Ibis 时,您可以无缝地从一个后端切换到另一个后端,而无需更改应用程序代码。例如,您可以使用 DuckDB 作为后端在本地开发,然后在生产环境中运行分析时将后端切换到 Spark。
Ibis 可以轻松地对存储在 CSV、Parquet、数据库或 Delta Lake 表中的数据运行查询。
让我们看一个如何使用 Ibis 查询 Delta Lake 表的示例,然后讨论 Ibis/Delta Lake 对 PySpark 用户的优势。这篇博文中的所有代码片段都可以在此笔记本中找到。
使用 PySpark 和 Ibis 查询 Delta Lake 表
本节向您展示如何创建 Delta Lake 表,然后使用 PySpark 后端通过 Ibis 查询它。
首先创建 Delta 表
df = spark.createDataFrame([(0, "Bob", 75), (1, "Sue", 25), (2, "Jim", 27)]).toDF(
"id", "name", "age"
)
df.write.format("delta").save("tmp/fun_people")
现在向表中追加更多数据
df = spark.createDataFrame([(8, "Larry", 19), (9, "Jerry", 69)]).toDF(
"id", "name", "age"
)
df.write.format("delta").mode("append").save("tmp/fun_people")
让我们检查 Delta 表的内容。
+---+-----+---+
| id| name|age|
+---+-----+---+
| 0| Bob| 75|
| 1| Sue| 25|
| 2| Jim| 27|
| 8|Larry| 19|
| 9|Jerry| 69|
+---+-----+---+
Delta 表包含来自初始写入操作的数据以及随后追加的数据。
让我们将 Delta Lake 作为 Spark 的临时视图公开
spark.read.format("delta").load("tmp/fun_people").createOrReplaceTempView("fun_people")
从临时视图创建 Ibis 表。
import ibis
con = ibis.pyspark.connect(spark)
table = con.table("fun_people")
对 Ibis 表运行查询,过滤数据集,只包含 50 岁或以上的人
table.filter(table.age >= 50)
id name age
0 9 Jerry 69
1 0 Bob 75
现在运行一个查询,简单地从 Delta 表中获取前两行数据
table.head(2)
id name age
0 9 Jerry 69
1 2 Jim 27
使用 PySpark 连接器,用 Ibis 语法查询 Delta 表非常容易。
现在让我们回溯到数据的原始版本,并使用 Ibis 运行相同的查询。
使用 Ibis 查询 Delta 表的先前版本
Delta Lake 支持时间旅行,因此您可以在 Delta 表的不同版本之间切换。
我们创建的 Delta 表具有以下两个版本。
让我们创建一个与 Delta 表版本 0 对应的视图,然后运行与以前相同的查询。
spark.read.format("delta").load("tmp/fun_people").createOrReplaceTempView(
"fun_people_v0"
)
对 Ibis 表运行查询,过滤数据集,只包含 Delta 表版本 0 中 50 岁或以上的人
table_v0.filter(table_v0.age >= 50)
id name age
0 0 Bob 75
这是原始结果的子集。
现在运行一个查询,简单地从 Delta 表版本 0 中获取前两行数据
table_v0.head(2)
id name age
0 2 Jim 27
1 0 Bob 75
Delta Lake 使使用 Ibis 查询数据的先前版本变得容易。现在让我们关注 Ibis 为 PySpark 用户提供的优势。
Ibis 对 PySpark 用户的优势
Ibis 为用户提供了编写代码的能力,这些代码可以在本地使用一个后端执行,在生产环境中使用另一个后端执行。
例如,您可以编写代码,在本地使用 DuckDB 后端在小数据集上执行,在生产环境中使用 PySpark 在大数据集上执行。这可以让您编写在本地执行更快的单元测试。它还可以弥合具有不同技术偏好的团队之间的差距。
已经熟悉 Ibis 的开发人员可以无缝地过渡到 PySpark 后端,而无需学习新的 DataFrame 查询语法。
大多数开发人员选择在本地和生产环境中编写 PySpark,所以这只是个人偏好问题。
Delta Lake 对 Ibis 用户的优势
Delta Lake 对 Ibis 用户很有用,原因与它对 pandas 或 PySpark 用户有利的原因相同。以下是一些原因:
- 版本化数据允许时间旅行
- ACID 事务保证
- 模式强制可防止您追加具有不匹配模式的数据
- 模式演进允许您随着时间的推移安全地更改表模式
- 生成列允许您根据其他列中的值一致且自动地填充某些列
- Delta Lake 支持高级合并命令
- 小文件可以轻松地通过 OPTIMIZE 进行压缩
- 约束和检查强制执行可以追加到列中的值
- 删除列非常快
- 您可以回滚到 Delta 表的早期版本以撤销错误
- 您可以轻松地从表中删除行
- 还有更多…
Delta Lake 具有大量对数据从业者至关重要的功能,而这些功能在 CSV 或 Parquet 数据湖中不可用。
与数据库不同,Delta Lake 灵活且适用于高级 AI 或 ML 工作负载。
Delta Lake 兼顾两全其美,请参阅《湖仓一体论文》了解更多信息。
结论
Ibis 是一种很酷的技术,用于编写与后端无关的代码。这提供了在不同执行环境或随着技术趋势演变时无缝切换后端的可能性。
假设您有一个为 Clickhouse 编写的 40,000 行代码库。将此代码库转换为 DuckDB 可能会很困难。如果您改用 Ibis 编写此代码,则转换将很容易(只要您不使用 Clickhouse 特定的功能)。
如果您想了解更多信息,请参阅这篇关于使用 Ibis 扩展到 Apache Spark 的博文。