The Linux Foundation Projects
Delta Lake

使用 Delta Lake 和 Ray 运行 ML 工作流

作者:Jim Hibbard

第一部分:使用 Ray 读取 Delta Lake

Delta Lake 和 Ray 是两种开源技术,可用于构建可扩展且可靠的数据处理和机器学习管道。本文将解释这些工具各自扮演的角色,并向您展示如何使用新的 deltaray 库轻松集成这两种技术。本博客的第二部分即将发布,内容将是如何使用这种集成来运行 Ray 和 Delta Lake 的 ML 工作流。此笔记本包含本博客文章中使用的所有代码,如果您想跟着操作。

Delta Lake 仍然是湖仓一体最可靠、可扩展和高性能的表存储格式。它是 ML 应用程序的完美选择,因为它支持存储大量的结构化和非结构化数据,以约束的形式支持数据质量检查,并管理元数据。Delta Lake 还提供数据版本控制以及处理数据一致性和事务保证的功能。

Ray 是一个流行的分布式计算框架,使您能够在机器集群上构建和运行并行和分布式应用程序。使用 Ray,您可以轻松地将机器学习和 Python 工作负载分布到多个节点,从而加快训练时间并支持处理更大的数据集。Ray 还提供强化学习、最先进的超参数调优、数据处理等功能,使其成为机器学习应用程序的多功能平台。

安装库

您可以使用 pip 安装本文示例所需的库

pip install deltalake deltaray pandas

deltalake 是 delta-rs 项目的 Python 接口,delta-rs 是 Rust 中的原生 Delta Lake 实现。虽然 delta-rs 是用 Rust 编写的,但您可以像使用任何其他 Python 库一样使用它,并将受益于 Rust OSS 社区的持续创新。我们将使用 deltalake 库为 deltaray 创建虚拟 Delta 表以读取到 Ray 中。向 deltaray 添加写入功能是积极开发中的一个领域,社区可以帮助增强 Delta Lake 和 Ray 的集成。

Delta Lake 最初是为 Spark 构建的,但 deltalake 实现没有 Spark 依赖,使其成为一个更轻量级的库。delta-spark 依赖于 Spark,而 deltalake 不依赖。同样,我们将依赖 Ray 而不是 Spark 作为我们的数据处理框架。这突出了 Delta Lake 作为开放存储格式的灵活性,我们可以根据需要从许多不同的上下文和工具访问它。

创建版本化的 Delta Lake

创建要从 Ray 读取的虚拟 Delta 表很容易。您将导入所需的库,并为当前工作目录和虚拟数据集的表 URI 创建变量。

# Standard Libraries
import pathlib

# External Libraries
import deltalake as dl
import pandas as pd


cwd = pathlib.Path().resolve()
table_uri = f'{cwd}/tmp/delta-table'

然后,您将创建一个初始 Delta 表,向其中追加数据,然后用新数据覆盖该表,以便 Delta 表的事务日志将包含有关数据如何随时间变化的某些历史记录。

# create initial Delta Table
df = pd.DataFrame({'id': [0, 1, 2, ], })
dl.write_deltalake(table_uri, df)

# update one
df = pd.DataFrame({'id': [3, 4, 5, ], })
dl.write_deltalake(table_uri, df, mode='append')

# update two
df = pd.DataFrame({'id': [6, 7, 8, 9, ], })
dl.write_deltalake(table_uri, df, mode='overwrite')

使用 Ray 读取版本化的 Delta Lake

deltaray 库使从 Ray 读取 Delta 表变得容易。通过传入 table_uriversion,只要底层 parquet 文件可用,我们就可以在特定时间点访问我们的 Delta 表。使用 vacuum 命令节省存储成本是 Delta 表的先前版本可能变得不可用的一个示例,您可以在此处阅读有关 vacuum 的更多信息。

# External Libraries
import deltaray

# Read the initially created version of the Delta Lake table
ds = deltaray.read_delta(table_uri, version=0)
ds.show()

{'id': 0}
{'id': 1}
{'id': 2}
# Read the second version of the Delta Lake table
ds = deltaray.read_delta(table_uri, version=1)
ds.show()

{'id': 0}
{'id': 1}
{'id': 2}
{'id': 3}
{'id': 4}
{'id': 5}

如果您想要 Delta 表的最新版本,则根本不需要提供 version 参数。您可以看到结果与我们对虚拟数据集进行的最后一次更新相对应,该更新覆盖了所有以前的版本。

# Read the current version of the Delta Lake table
ds = deltaray.read_delta(table_uri)
ds.show()

{'id': 6}
{'id': 7}
{'id': 8}
{'id': 9}

讨论

当 Delta Lake 和 Ray 一起使用时,可以帮助您构建可扩展且可靠的机器学习管道。您可以使用 Ray 在集群中分发机器学习工作负载,并使用 Delta Lake 以可扩展、版本化和容错的方式存储和管理大型数据集。通过 deltaray,现在有了从 Ray 访问 Delta Lake 的 API,在您的机器学习管道中提供了两种技术之间的轻松集成。如果您有兴趣为 deltaray 做出贡献,下一步包括完全实现 Ray 的 Datasource API 以及改进测试和文档。

下一步

在本博客的第二部分中,我们将详细介绍一个完整的 ML 示例,包括超参数调优和模型选择。我们将使用 Delta Lake 作为训练数据的来源,deltaray 用于数据摄取,Ray 用于模型训练和选择。

LinkedIn 上关注我们的作者