The Linux Foundation Projects
Delta Lake

使用 Delta Sharing 0.5.0 共享 Delta 表的变更数据源

作者:Will Girten

我们很高兴 Delta Sharing 0.5.0 发布,它引入了多项增强功能,包括以下特性:

  • 启用 Delta 表变更数据源的共享 - 允许 Delta Sharing 客户端获取共享 Delta 表的增量更改(#135, #136, #137, #138, #140, #141, #142, #145, #146, #147, #148, #149, #150, #151, #152, #153, #155, #159)
  • 支持查询 Delta 表版本 - Python rest 客户端中新增了一个函数 query_table_version(),允许数据接收方查询共享 Delta 表的版本(#111
  • 改进了 Python 连接器的错误消息 - Python rest 客户端已更新,以包含来自共享服务器的更详细的响应正文(#124
  • 增强了 REST API 的错误消息 - Delta Sharing 服务器已扩展,以改进 GET Share、Schema 和 Table API 的错误代码和错误消息(#120
  • 更新了文档 - 协议和 REST API 文档得到了改进(#121, #128, #131

在这篇博文中,我们将介绍本次发布中一些流行的改进。

共享 Delta 表的变更数据源

本次发布的新功能是支持共享 Delta 表的变更数据源。这是数据接收方跟踪数据提供方发生的增量更改的绝佳方式。数据接收方现在可以只读取对表所做的更改,而无需重新读取整个数据集以获取最新快照。

启用变更数据源示例

数据提供方:启用变更数据源

使用 Delta Sharing 共享 Delta 表的变更数据源非常简单!首先,数据提供方在源 Delta 表上启用变更数据源。提供方可以通过更新表属性来在现有 Delta Lake 表上启用变更数据源。

# Enable CDF for an existing Delta table by updating the table properties
spark.sql(f"""
ALTER TABLE delta.`{cloud_storage_path}`
SET TBLPROPERTIES (delta.enableChangeDataFeed=true)
""")

对于新表,数据提供方还可以在创建时使用 DeltaTableBuilder API 启用变更数据源。

from delta import DeltaTable

# Enable CDF for a new Delta table using the `DeltaTableBuilder` API
DeltaTable.createOrReplace(spark) \
  .addColumn("ID", "INT") \
  .addColumn("crim", "DOUBLE") \
  .addColumn("zn", "DOUBLE") \
  .addColumn("indus", "DOUBLE") \
  .addColumn("chas", "INT") \
  .addColumn("nox", "DOUBLE") \
  .addColumn("rm", "DOUBLE") \
  .addColumn("age", "DOUBLE") \
  .property("delta.enableChangeDataFeed", "true") \
  .location(cloud_storage_path) \
  .execute()

数据提供方:更新服务器配置

最后,数据提供方必须更新所有应启用变更数据源的表的 cdfEnabled 属性。默认情况下,如果未在共享服务器配置中明确定义此值,则此值将设置为 false。此外,更改 cdfEnabled 属性的值会启用或禁用特定表的变更数据源共享。这是一个强大的功能,允许数据提供方通过配置选择要共享更改的表,同时不触及底层表。例如,下面是一个示例共享服务器配置,用于共享名为 boston-housing 的 Delta 表的变更数据源。

version: 1
shares:
  - name: "airbnbshare"
    schemas:
      - name: "listings"
        tables:
          - name: "nyc"
            location: "wasbs://airbnb@deltasharing.blob.core.windows.net/airbnb/nyc"
            cdfEnabled: true

数据接收方:查询表更改

在源表上将变更数据源作为 Delta 表属性启用,并在 Delta Sharing 服务器配置中将 cdfEnabled 属性设置为 true 后,数据接收方现在可以从共享客户端查询 Delta 表更改。在 0.5.0 版本中,Python 连接器添加了两个新函数用于读取共享 Delta 表的变更数据源:

  1. load_table_changes_as_spark() - 用于将更改作为 Apache Spark DataFrame 读取,以及
  2. load_table_changes_as_pandas() - 用于将表更改作为 Pandas DataFrame 读取。

查询表版本的新 API

0.5.0 版本中新增了一个附加函数 query_table_version(),已添加到 Python rest 客户端中。这个新函数允许数据接收方查询共享 Delta 表的版本。这是数据接收方快速检查他们正在使用的 Delta 表版本的绝佳方式。

改进了 Python Rest 客户端中的错误消息

此版本中还新增了对 Python rest 客户端错误处理的增强,以在 HTTPError 消息中包含共享服务器的响应正文。

以前,共享服务器的响应正文未包含,导致难以理解处理错误。通过包含来自共享服务器的响应正文,数据接收方可以快速确定共享服务器处理中出现的问题,并附带详细消息。

改进了共享服务器中的错误消息

此版本中还新增了共享服务器 TableManager 的改进错误消息。在以前的版本中,如果共享服务器未找到表、共享或模式,则会返回一个不太具描述性的消息,例如:schema 'invalid_schema' not found.

在 0.5.0 版本中,此错误消息已增强,以指示数据接收方应联系数据提供方。

接下来

我们对 Delta Sharing 的未来发布感到非常兴奋。我们目前正在开发的一个重要功能是启用从共享 Delta 表进行流式传输。请关注GitHub 里程碑中即将发布的版本和功能公告!

想开始使用 Delta Sharing 但不知道从何开始?今天就尝试一下快速入门示例吧!

致谢

我们衷心感谢为本次发布做出贡献的所有人,包括 Abhijit Chakankar、Alex Ott、Lin Zhou、Shixiong Zhu、William Chau、Xiaotong Sun、harksin、Kohei Toshimitsu 和 Vuong Nguyen。

LinkedIn 上关注我们的作者