使用 Delta Sharing 0.5.0 共享 Delta 表的变更数据源
作者:Will Girten
我们很高兴 Delta Sharing 0.5.0 发布,它引入了多项增强功能,包括以下特性:
- 启用 Delta 表变更数据源的共享 - 允许 Delta Sharing 客户端获取共享 Delta 表的增量更改(#135, #136, #137, #138, #140, #141, #142, #145, #146, #147, #148, #149, #150, #151, #152, #153, #155, #159)
- 支持查询 Delta 表版本 - Python rest 客户端中新增了一个函数
query_table_version()
,允许数据接收方查询共享 Delta 表的版本(#111) - 改进了 Python 连接器的错误消息 - Python rest 客户端已更新,以包含来自共享服务器的更详细的响应正文(#124)
- 增强了 REST API 的错误消息 - Delta Sharing 服务器已扩展,以改进
GET
Share、Schema 和 Table API 的错误代码和错误消息(#120) - 更新了文档 - 协议和 REST API 文档得到了改进(#121, #128, #131)
在这篇博文中,我们将介绍本次发布中一些流行的改进。
共享 Delta 表的变更数据源
本次发布的新功能是支持共享 Delta 表的变更数据源。这是数据接收方跟踪数据提供方发生的增量更改的绝佳方式。数据接收方现在可以只读取对表所做的更改,而无需重新读取整个数据集以获取最新快照。
启用变更数据源示例
数据提供方:启用变更数据源
使用 Delta Sharing 共享 Delta 表的变更数据源非常简单!首先,数据提供方在源 Delta 表上启用变更数据源。提供方可以通过更新表属性来在现有 Delta Lake 表上启用变更数据源。
# Enable CDF for an existing Delta table by updating the table properties
spark.sql(f"""
ALTER TABLE delta.`{cloud_storage_path}`
SET TBLPROPERTIES (delta.enableChangeDataFeed=true)
""")
对于新表,数据提供方还可以在创建时使用 DeltaTableBuilder
API 启用变更数据源。
from delta import DeltaTable
# Enable CDF for a new Delta table using the `DeltaTableBuilder` API
DeltaTable.createOrReplace(spark) \
.addColumn("ID", "INT") \
.addColumn("crim", "DOUBLE") \
.addColumn("zn", "DOUBLE") \
.addColumn("indus", "DOUBLE") \
.addColumn("chas", "INT") \
.addColumn("nox", "DOUBLE") \
.addColumn("rm", "DOUBLE") \
.addColumn("age", "DOUBLE") \
.property("delta.enableChangeDataFeed", "true") \
.location(cloud_storage_path) \
.execute()
数据提供方:更新服务器配置
最后,数据提供方必须更新所有应启用变更数据源的表的 cdfEnabled
属性。默认情况下,如果未在共享服务器配置中明确定义此值,则此值将设置为 false
。此外,更改 cdfEnabled
属性的值会启用或禁用特定表的变更数据源共享。这是一个强大的功能,允许数据提供方通过配置选择要共享更改的表,同时不触及底层表。例如,下面是一个示例共享服务器配置,用于共享名为 boston-housing
的 Delta 表的变更数据源。
version: 1
shares:
- name: "airbnbshare"
schemas:
- name: "listings"
tables:
- name: "nyc"
location: "wasbs://airbnb@deltasharing.blob.core.windows.net/airbnb/nyc"
cdfEnabled: true
数据接收方:查询表更改
在源表上将变更数据源作为 Delta 表属性启用,并在 Delta Sharing 服务器配置中将 cdfEnabled
属性设置为 true
后,数据接收方现在可以从共享客户端查询 Delta 表更改。在 0.5.0 版本中,Python 连接器添加了两个新函数用于读取共享 Delta 表的变更数据源:
load_table_changes_as_spark()
- 用于将更改作为 Apache Spark DataFrame 读取,以及load_table_changes_as_pandas()
- 用于将表更改作为 Pandas DataFrame 读取。
查询表版本的新 API
0.5.0 版本中新增了一个附加函数 query_table_version()
,已添加到 Python rest 客户端中。这个新函数允许数据接收方查询共享 Delta 表的版本。这是数据接收方快速检查他们正在使用的 Delta 表版本的绝佳方式。
改进了 Python Rest 客户端中的错误消息
此版本中还新增了对 Python rest 客户端错误处理的增强,以在 HTTPError
消息中包含共享服务器的响应正文。
以前,共享服务器的响应正文未包含,导致难以理解处理错误。通过包含来自共享服务器的响应正文,数据接收方可以快速确定共享服务器处理中出现的问题,并附带详细消息。
改进了共享服务器中的错误消息
此版本中还新增了共享服务器 TableManager
的改进错误消息。在以前的版本中,如果共享服务器未找到表、共享或模式,则会返回一个不太具描述性的消息,例如:schema 'invalid_schema' not found.
在 0.5.0 版本中,此错误消息已增强,以指示数据接收方应联系数据提供方。
接下来
我们对 Delta Sharing 的未来发布感到非常兴奋。我们目前正在开发的一个重要功能是启用从共享 Delta 表进行流式传输。请关注GitHub 里程碑中即将发布的版本和功能公告!
想开始使用 Delta Sharing 但不知道从何开始?今天就尝试一下快速入门示例吧!
致谢
我们衷心感谢为本次发布做出贡献的所有人,包括 Abhijit Chakankar、Alex Ott、Lin Zhou、Shixiong Zhu、William Chau、Xiaotong Sun、harksin、Kohei Toshimitsu 和 Vuong Nguyen。