无需 Spark 的 Delta Lake
作者:Avril Aysha
这篇文章将向您展示如何在不使用 Spark 的情况下使用 Delta Lake。
您可能希望在不使用 Spark 的情况下使用 Delta Lake,原因如下:
- 您不想学习 Spark
- 您的团队不使用 Spark
- 您不想使用 Java 虚拟机 (JVM)
- 您正在处理相对较小的数据集
您可以使用许多其他语言(如 SQL、Python 和 Rust)在不使用 Spark 的情况下使用 Delta Lake。本文将向您展示在不使用 Spark 的情况下使用 Delta Lake 的最流行方式的示例。
让我们开始吧!🪂
如何在不使用 Spark 的情况下使用 Delta Lake
有许多方法可以在不使用 Spark 的情况下使用 Delta Lake。
为了清晰起见,我们将其分为两类:
- 专用的 Delta 连接器 允许您从 Flink、Hive、Trino、PrestoDB 等引擎使用 Delta Lake
- delta-rs 包允许您在 Rust 或 Python 中使用 Delta Lake,例如与 pandas、polars、Dask、Daft、DuckDB 等一起使用
本文将向您展示每种选项的简短代码示例,以便在不使用 Spark 的情况下使用 Delta Lake。您还可以在 Delta Lake 网站上找到完整的集成列表。
无需 Spark 的 Delta Lake:专用连接器
许多非 Spark 查询引擎都有专用连接器来使用 Delta Lake。这些都基于 Delta Standalone:一个用于 Java/Scala 的 JVM 库,可用于读取和写入 Delta 表。您可以使用 Delta Standalone 为未在集成页面上列出的服务构建自己的 Delta 连接器。
注意:如果您想完全避免 JVM,请参阅下面的 delta-rs 部分
您可以通过以下专用 Delta 连接器在不使用 Spark 的情况下使用 Delta Lake:
- Apache Flink
- Apache Hive
- PrestoDB
- Trino
- Amazon Athena
- Snowflake
- Google BigQuery
- Microsoft Fabric
其中一些连接器支持有限的 Delta Lake 功能。请务必查看每个连接器的“已知限制”部分以了解更多信息。
无需 Spark 的 Delta Lake:Apache Flink
您可以使用 Flink/Delta 连接器 从 Apache Flink 使用 Delta Lake。该连接器支持批处理和流式模式的数据写入。
连接器包括:
DeltaSink
用于将数据从 Apache Flink 写入 Delta 表。DeltaSource
用于使用 Apache Flink 读取 Delta 表。
您可以将 Delta Lake 与 Flink Python 或 SQL API 一起使用。
Delta Lake 与 Flink:Python
下面的代码是如何使用分区列 surname
将数据写入分区表的示例。
import io.delta.flink.sink.DeltaBucketAssigner;
import io.delta.flink.sink.DeltaSinkBuilder;
public DataStream<RowData> createDeltaSink(
DataStream<RowData> stream,
String deltaTablePath) {
String[] partitionCols = { "surname" };
DeltaSink<RowData> deltaSink = DeltaSink
.forRowData(
new Path(deltaTablePath),
new Configuration(),
rowType)
.withPartitionColumns(partitionCols)
.build();
stream.sinkTo(deltaSink);
return stream;
}
您还可以从 Apache Flink 使用 Delta Lake 的时间旅行功能。例如,像这样:
public DataStream<RowData> createBoundedDeltaSourceWithTimeTravel(
StreamExecutionEnvironment env,
String deltaTablePath) {
DeltaSource<RowData> deltaSource = DeltaSource
.forBoundedRowData(
new Path(deltaTablePath),
new Configuration())
// could also use `.versionAsOf(314159)`
.timestampAsOf("2022-06-28 04:55:00")
.build();
return env.fromSource(deltaSource, WatermarkStrategy.noWatermarks(), "delta-source");
}
Delta Lake 与 Flink:SQL
从 Flink 3.0.0 版本开始,Delta 连接器可用于 Flink SQL 作业。Delta Source 和 Delta Sink 都可以用作 Flink 表,用于 SELECT 和 INSERT 查询。
例如,您可以使用以下方式将整个 Delta 表加载到另一个 Delta 表中:
INSERT INTO sinkTable SELECT * FROM sourceTable;
在上述 SQL 查询中,sourceTable
和 sinkTable
都指使用 Delta/Flink 连接器配置的 Delta 表。表模式必须匹配。
或者创建新的分区表:
CREATE TABLE testTable (
id BIGINT,
data STRING,
part_a STRING,
part_b STRING
)
PARTITIONED BY (part_a, part_b);
WITH (
'connector' = 'delta',
'table-path' = '<path-to-table>',
'<arbitrary-user-define-table-property' = '<value>',
'<delta.*-properties>' = '<value'>
);
您无需编写任何 Spark 即可将 Delta Lake 与 Apache Flink 一起使用。
请注意,Flink/Delta SQL 连接器必须与 Delta Catalog 一起使用。尝试在未配置 Delta Catalog 的情况下使用 Flink API 对 Delta 表执行 SQL 查询将导致 SQL 作业失败。
已知限制
- 目前仅支持
append
写入操作;不支持overwrite
或upsert
。 - Azure Blob Storage 目前仅支持读取。由于类着色问题,Flink 不支持写入 Azure Blob Storage,请参见 此问题。
- 对于 AWS S3 存储,为了确保来自不同集群的并发事务写入,请使用 多集群配置指南。有关如何在 Flink Delta Sink 中使用此配置的示例,请参见 此示例。
- Delta SQL 连接器目前仅支持物理列。元数据和计算列目前不支持。有关详细信息,请参见 此处。
无需 Spark 的 Delta Lake:Apache Hive
您可以使用 Hive 连接器 从 Apache Hive 使用 Delta Lake。您可以使用此连接器查询 Hive 中的 Delta 表数据。您不能使用它将数据从 Hive 写入 Delta 表。
要使用 Delta Lake,您需要定义一个指向 Delta 表的外部 Hive 表,例如在 S3 上,像这样:
CREATE EXTERNAL TABLE deltaTable(col1 INT, col2 STRING)
STORED BY 'io.delta.hive.DeltaStorageHandler'
LOCATION '/delta/table/path'
CREATE TABLE 语句中的表模式应与您正在读取的 Delta 表的模式匹配。
定义外部 Hive 表后,您可以按如下方式查询它:
select * from deltaTable;
您无需编写任何 Spark 即可将 Delta Lake 与 Apache Hive 一起使用。
已知限制
- 此连接器是只读的。不支持任何写入操作。
- 仅支持外部 Hive 表。Delta 表必须在使用外部 Hive 表引用它之前使用 Spark 创建。
无需 Spark 的 Delta Lake:PrestoDB
您可以使用 PrestoDB 连接器 从 Presto 使用 Delta Lake。此连接器基于 Hive 连接器,并共享许多相同的配置选项。
您的 Delta 表需要注册到 Hive 元数据存储中。
您可以按如下方式从 S3 上现有的 Delta 表创建 Presto 表:
CREATE TABLE sales.apac.sales_data_new (sampleColumn INT)
WITH (external_location = 's3://db-sa-datasets/presto/sales_data_new');
要在 Hive 元数据存储中注册表,您不需要传递表的完整模式,因为 Delta Lake 连接器从位于 Delta Lake 表位置的元数据中获取模式。为了避免 Hive 元数据存储中的 no columns error
,请提供一个示例列作为正在注册的 Delta 表的模式。
要访问已作为 apac
数据库和 sales
目录的一部分注册到 Hive 元数据存储中的 Delta 表 sales_data
,您可以简单地运行:
SELECT * FROM sales.apac.sales_data LIMIT 200;
您还可以通过传递 path
直接从 S3 查询 Delta 表:
SELECT * FROM sales."$path$"."s3://db-sa-datasets/presto/sales_data" LIMIT 200;
您可以通过将版本添加到表名来回溯到 Delta 表的特定版本,如下所示:
SELECT * FROM sales.apac."sales_data@v4" LIMIT 200;
已知限制
- 此连接器是只读的。不支持任何写入操作。
- 此连接器重用了 Hive 连接器中存在的许多模块,例如用于连接和安全性(如 S3、Azure Data Lake、AWS Glue 元数据存储等)。这些模块的配置与 Hive 连接器文档 中提供的配置相同。\
您无需编写任何 Spark 即可将 Delta Lake 与 PrestoDB 一起使用。
无需 Spark 的 Delta Lake:Trino
您可以使用 Trino 连接器 从 Trino 使用 Delta Lake。然后您可以使用 SQL 查询和转换您的 Delta 表。请注意,您的 Delta 表必须注册到元数据存储中,例如 Hive 元数据存储或 AWS Glue。
Trino 连接器支持读取和写入操作。您可以 追加、覆盖和合并 您的 Delta 表。
例如,您可以创建 Delta Lake 表,添加一些数据,修改一些数据,然后添加更多数据,像这样:
CREATE TABLE users(id int, name varchar) WITH (column_mapping_mode = 'name');
INSERT INTO users VALUES (1, 'Alice'), (2, 'Bob'), (3, 'Mallory');
ALTER TABLE users DROP COLUMN name;
INSERT INTO users VALUES 4;
使用以下语句查看表中的所有数据:
> SELECT * FROM users ORDER BY id;
id
----
1
2
3
4
使用 $history
元数据表查看过去操作的记录:
> SELECT version, timestamp, operation
> FROM "users$history";
version | timestamp | operation
---------+------------------------------------+--------------
0 | 2024-04-10 17:49:18.528 Asia/Tokyo | CREATE TABLE
1 | 2024-04-10 17:49:18.755 Asia/Tokyo | WRITE
2 | 2024-04-10 17:49:18.929 Asia/Tokyo | DROP COLUMNS
3 | 2024-04-10 17:49:19.137 Asia/Tokyo | WRITE
然后回溯到版本 1:
> SELECT *
> FROM users FOR VERSION AS OF 1;
id | name
----+---------
1 | Alice
2 | Bob
3 | Mallory
您无需编写任何 Spark 即可将 Delta Lake 与 Trino 一起使用。
写入云存储
- 默认情况下,写入 Azure ADLS Gen2 和 Google Cloud Storage 是启用的。Trino 在从多个 Trino 集群或其他查询引擎写入时,会检测这些存储系统上的写入冲突。
- 写入 Amazon S3 和兼容 S3 的存储必须使用
delta.enable-non-concurrent-writes
属性启用。从多个 Trino 集群写入 S3 是安全的;但是,当从其他 Delta Lake 引擎并发写入时,不会检测到写入冲突。您必须确保没有并发数据修改运行,以避免数据损坏。
数据类型映射
- Trino 和 Delta Lake 各自支持对方不支持的数据类型。因此,连接器在读取或写入数据时会修改某些类型。请参阅连接器文档中的 类型映射 以了解更多信息。
无需 Spark 的 Delta Lake:Amazon Athena
您可以使用 Athena 连接器 从 Amazon Athena 使用 Delta Lake。
请注意,您的 Delta Lake 表必须注册到 AWS Glue 元数据存储中。
如果您的表在 Amazon S3 中但不在 AWS Glue 中,请首先运行 CREATE EXTERNAL TABLE 语句:
CREATE EXTERNAL TABLE
[db_name.]table_name
LOCATION 's3://DOC-EXAMPLE-BUCKET/your-folder/'
TBLPROPERTIES ('table_type' = 'DELTA')
Delta Lake 表元数据从 Delta Lake 事务日志推断并直接同步到 AWS Glue。您无需提供列或模式定义。
然后您可以使用标准 SQL 语法查询您的 Delta 表。例如:
SELECT * FROM delta_table_users ORDER BY id;
您无需编写任何 Spark 即可将 Delta Lake 与 Amazon Athena 一起使用。
已知限制
无需 Spark 的 Delta Lake:Snowflake
您可以使用 Snowflake 连接器 从 Snowflake 使用 Delta Lake。
您需要创建一个指向存储在云存储中的 Delta Lake 的 Snowflake 外部表。支持的云存储服务包括:Amazon S3、Google Cloud Storage 和 Microsoft Azure。
然后您可以使用标准 SQL 语法查询您的 Delta 表。
例如,您可以按如下方式创建由 Delta Lake 支持的外部表:
CREATE EXTERNAL TABLE twitter_feed(
PARTITION BY (date_part)
LOCATION=@mystage/daily/
FILE_FORMAT = (TYPE = PARQUET)
TABLE_FORMAT = DELTA;
请注意 FILE_FORMAT = (TYPE = PARQUET)
和 TABLE_FORMAT = DELTA
。这些值必须这样设置。
为了获得最佳性能,建议为外部表定义分区列。在此示例中,我们已将 date_part
定义为分区列。
您无需编写任何 Spark 即可将 Delta Lake 与 Snowflake 一起使用。
已知限制
- 此连接器目前是预览功能。
- 对于引用 Delta Lake 文件的外部表,无法自动刷新元数据。相反,请定期执行 ALTER EXTERNAL TABLE … REFRESH 语句以注册任何添加或删除的文件。
- 在引用 Delta Lake 时,不支持以下 Snowflake 参数:
AWS_SNS_TOPIC = 'string'
PATTERN = 'regex_pattern'
无需 Spark 的 Delta Lake:Google BigQuery
您可以使用 BigQuery 连接器 从 Google BigQuery 使用 Delta Lake。
您需要将现有 Delta 表定义为 BigQuery 中的外部表。这被称为 Delta Lake BigLake。
您可以按如下方式操作:
CREATE EXTERNAL TABLE `PROJECT_ID.DATASET.DELTALAKE_TABLE_NAME`
WITH CONNECTION `PROJECT_ID.REGION.CONNECTION_ID`
OPTIONS (
format ="DELTA_LAKE",
uris=['DELTA_TABLE_GCS_BASE_PATH']);
创建 Delta Lake BigLake 后,您可以使用 GoogleSQL 查询它。例如:
SELECT field1, field2 FROM mydataset.my_cloud_storage_table;
您无需编写任何 Spark 即可将 Delta Lake 与 Google BigQuery 一起使用。
在 专用博客文章 中阅读更多内容。
已知限制
- 此连接器作为预发布功能提供。预发布功能按“原样”提供,可能支持有限。
- 支持带有删除向量和列映射的 Delta Lake 读取器版本 3。
- 您必须在最后一个日志条目文件中列出读取器版本。例如,新表必须包含
00000..0.json
。 - 不支持变更数据捕获 (CDC) 操作。任何现有 CDC 操作都将被忽略。
- 模式是自动检测的。不支持使用 BigQuery 修改模式。
- 表列名必须符合 BigQuery 列名限制。
- 不支持物化视图。
- 数据类型可能会根据 类型映射矩阵 进行转换。
无需 Spark 的 Delta Lake 与 delta-rs
delta-rs 库允许您在没有 Spark 或 Java 的情况下使用 Python 或 Rust 读取、写入和管理 Delta Lake 表。它在底层使用 Apache Arrow,因此与 pandas、DuckDB 和 Polars 等其他 Arrow 原生或集成库兼容。
使用 delta-rs 结合 Delta Lake 完全避免了 JVM。
delta-rs 有两个公共 API:
- “rust deltalake”指 delta-rs 的 Rust API
- “python deltalake”指 delta-rs 的 Python API
python deltalake API 允许您从许多开源查询引擎使用 Delta Lake,包括:
- pandas
- polars
- Dask
- Daft
- DuckDB
- Datafusion
有关更多信息,请参阅 delta-rs 文档中的 集成页面。
无需 Spark 的 Delta Lake:pandas
您可以使用 delta-rs 与 pandas 一起使用 Delta Lake。让我们看一个示例。
首先按如下方式导入 delta-rs 和 pandas:
import pandas as pd
from deltalake import write_deltalake, DeltaTable
定义两个字典来存储一些数据:
data = {'first_name': ['bob', 'li', 'leah'], 'age': [47, 23, 51]}
data_2 = {"first_name": ["suh", "anais"], "age": [33, 68]}
使用第一个字典创建一个 DataFrame 并将其写入 Delta Lake 表:
df = pd.DataFrame.from_dict(data)
write_deltalake("tmp/pandas-table", df)
加载 Delta 表以检查结果:
> DeltaTable("tmp/pandas-table/").to_pandas()
first_name age
0 bob 47
1 li 23
2 leah 51
让我们追加其余数据:
df2 = pd.DataFrame(data_2)
write_deltalake("tmp/pandas-table", df2, mode="append")
重新读取以再次检查:
> DeltaTable("tmp/pandas-table/").to_pandas()
first_name age
0 bob 47
1 li 23
2 leah 51
3 suh 33
4 anais 68
您可以使用 version 关键字回溯到以前的版本:
> DeltaTable("tmp/pandas-table/", version=0).to_pandas()
first_name age
0 bob 47
1 li 23
2 leah 51
有关更多信息,请参阅 delta-rs 文档中的 Pandas 集成页面。
无需 Spark 的 Delta Lake:Polars
您可以使用 delta-rs 与 Polars 一起使用 Delta Lake。让我们看一个示例。
首先导入 polars:
import polars as pl
定义两个字典来存储一些数据:
data = {'first_name': ['bob', 'li', 'leah'], 'age': [47, 23, 51]}
data_2 = {"first_name": ["suh", "anais"], "age": [33, 68]}
使用第一个字典创建一个 DataFrame 并将其写入 Delta Lake 表:
df = pl.DataFrame(data)
df.write_delta("tmp/polars_table")
读取 Delta 表并打印 DataFrame 以进行可视化:
> print(pl.read_delta("tmp/polars_table"))
┌────────────┬─────┐
│ first_name ┆ age │
│ --- ┆ --- │
│ str ┆ i64 │
╞════════════╪═════╡
│ bob ┆ 47 │
│ li ┆ 23 │
│ leah ┆ 51 │
└────────────┴─────┘
使用第二个字典创建另一个 DataFrame 并将其追加到第一个:
df = pl.DataFrame(data_2)
df.write_delta("tmp/polars_table", mode="append")
读取并可视化:
> print(pl.read_delta("tmp/polars_table"))
┌────────────┬─────┐
│ first_name ┆ age │
│ --- ┆ --- │
│ str ┆ i64 │
╞════════════╪═════╡
│ suh ┆ 33 │
│ anais ┆ 68 │
│ bob ┆ 47 │
│ li ┆ 23 │
│ leah ┆ 51 │
└────────────┴─────┘
使用时间旅行功能回溯到早期版本:
> print(pl.read_delta("tmp/polars_table", version=0))
┌────────────┬─────┐
│ first_name ┆ age │
│ --- ┆ --- │
│ str ┆ i64 │
╞════════════╪═════╡
│ bob ┆ 47 │
│ li ┆ 23 │
│ leah ┆ 51 │
└────────────┴─────┘
请注意,与 pandas 不同,polars 有自己的 read_delta
和 write_delta
方法。这意味着您无需显式导入 deltalake
;它作为依赖项在 Polars
内部使用。
有关更多信息,请参阅 delta-rs 文档中的 Polars 集成页面。
无需 Spark 的 Delta Lake:Dask
您可以使用 delta-rs 与 Dask 一起使用 Delta Lake。此功能可通过 dask-deltatable
库获得。
请注意,dask-deltatable
仅适用于 deltalake==0.13.0
让我们看一个示例。
如果您正在运行 dask >= 2024.3.0
,您将不得不禁用 Dask 的新查询规划器才能与 dask-deltatable
配合使用。您可以通过设置以下内容来完成:
dask.config.set({'dataframe.query-planning': False})
这仅在您在导入 dask-deltatable
之前设置配置时才有效。有关查询规划器的更多信息,请参阅 Dask 文档。
接下来,导入 dask-deltatable
和 dask.dataframe
:
import dask_deltatable as ddt
import dask.dataframe as dd
定义两个带有测试数据的字典:
data = {'first_name': ['bob', 'li', 'leah'], 'age': [47, 23, 51]}
data_2 = {"first_name": ["suh", "anais"], "age": [33, 68]}
让我们从第一个字典创建一个 Dask DataFrame:
> ddf = dd.from_dict(data, npartitions=1)
> ddf.compute()
first_name age
0 bob 47
1 li 23
2 leah 51
现在,将此 Dask DataFrame 写入 Delta 表:
ddt.to_deltalake("tmp/dask-table", ddf)
并重新读取以确认:
> delta_path = "tmp/dask-table/"
> ddf = ddt.read_deltalake(delta_path)
> ddf.compute()
first_name age
0 bob 47
1 li 23
2 leah 51
让我们创建第二个 DataFrame,其中包含要追加到 Delta 表的数据:
> ddf_2 = dd.from_dict(data_2, npartitions=1)
> ddf_2.compute()
first_name age
0 suh 33
1 anais 68
并以 append
模式执行写入以将其添加到现有 Delta 表中:
ddt.to_deltalake("tmp/dask-table", ddf_2, mode="append")
重新读取以确认:
> delta_path = "tmp/dask-table/"
> ddf = ddt.read_deltalake(delta_path)
> ddf.compute()
first_name age
0 bob 47
1 li 23
2 leah 51
0 suh 33
1 anais 68
太棒了。
您还可以使用 version
关键字参数回溯到 Delta 表的早期版本:
> delta_path = "tmp/dask-table/"
> ddf = ddt.read_deltalake(delta_path, version=0)
> print(ddf.compute())
first_name age
0 bob 47
1 li 23
2 leah 51
有关更多信息,请参阅 delta-rs 文档中的 Dask 集成页面。
无需 Spark 的 Delta Lake:Daft
您可以使用 delta-rs 与 Daft 一起使用 Delta Lake。
Daft 目前支持没有时间旅行的读取操作。写入操作可在 非公开预览 API 功能 中获得。
让我们看一个示例。
您可以按如下方式将现有 Delta 表读取到 Daft DataFrame 中:
> df = daft.read_delta_lake("tmp/pandas-table")
> df.collect()
╭────────────┬───────╮
│ first_name ┆ age │
│ --- ┆ --- │
│ Utf8 ┆ Int64 │
╞════════════╪═══════╡
│ bob ┆ 47 │
├╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌┤
│ li ┆ 23 │
├╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌┤
│ leah ┆ 51 │
├╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌┤
│ suh ┆ 33 │
├╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌┤
│ anais ┆ 68 │
╰────────────┴───────╯
然后您可以查询或转换数据。例如:
> df.where(df["age"] > 40).collect()
╭────────────┬───────╮
│ first_name ┆ age │
│ --- ┆ --- │
│ Utf8 ┆ Int64 │
╞════════════╪═══════╡
│ bob ┆ 47 │
├╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌┤
│ leah ┆ 51 │
├╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌┤
│ anais ┆ 68 │
╰────────────┴───────╯
然后您可以使用 write_deltalake
将数据写入 Delta 表:
df.write_deltalake("tmp/daft-table", mode="overwrite")
Daft 支持多种写入模式。
有关更多信息,请参阅 delta-rs 文档中的 Daft 集成页面。
无需 Spark 的 Delta Lake:DuckDB
您可以使用 delta-rs 与 DuckDB 一起使用 Delta Lake。
Delta Lake 表可以作为 Arrow 表和 Arrow 数据集 公开,这允许与各种查询引擎进行互操作。您可以使用 Arrow 作为中间步骤来使用 Delta Lake 与 DuckDB。
让我们看一个示例。
首先导入 duckdb
和 deltalake
:
import duckdb
from deltalake import write_deltalake, DeltaTable
现在加载现有的 Delta 表,例如我们之前使用 pandas
创建的表:
dt = DeltaTable("tmp/pandas-table/")
将此 Delta 表转换为 DuckDB 数据集,使用 Arrow 数据集作为中间步骤:
arrow_data = dt.to_pyarrow_dataset()
duck_data = duckdb.arrow(arrow_data)
现在您可以查询您的 DuckDB 数据集:
> query = """
> select
> age
> from duck_data
> order by 1 desc
> """
> duckdb.query(query)
┌───────┐
│ age │
│ int64 │
├───────┤
│ 68 │
│ 51 │
│ 47 │
│ 33 │
│ 23 │
└───────┘
要将此数据写入 Delta 表,请首先将其转换为 Arrow 表。然后使用 write_deltalake
将其写入 Delta 表:
arrow_table = duckdb.query(query).to_arrow_table()
write_deltalake(
data=arrow_table,
table_or_uri="tmp/duckdb-table",
mode="overwrite",
)
重新读取以确认:
> dt = DeltaTable("tmp/duckdb-table/")
> dt.to_pandas()
age
0 68
1 51
2 47
3 33
4 23
太棒了。
现在让我们更新查询以将其限制为仅 3 条记录:
query = """
select
age
from duck_data
order by 1 desc
limit 3
"""
并覆盖现有 Delta 表:
arrow_table = duckdb.query(query).to_arrow_table()
write_deltalake(
data=arrow_table,
table_or_uri="tmp/duckdb-table",
mode="overwrite",
)
重新读取以确认:
> dt = DeltaTable("tmp/duckdb-table/")
> dt.to_pandas()
age
0 68
1 51
2 47
您可以使用 version
关键字参数在 Delta 表的不同版本之间进行时间旅行:
> dt = DeltaTable("tmp/duckdb-table/", version=0)
> dt.to_pandas()
age
0 68
1 51
2 47
3 33
4 23
还有一个用于 Delta Lake 的实验性 DuckDB 扩展。此扩展由 DuckDB 维护,您可以在 Github 存储库 中阅读相关内容。该扩展目前仅支持读取操作。
无需 Spark 的 Delta Lake:Datafusion
您可以使用 delta-rs 与 Datafusion 一起使用 Delta Lake。您可以使用 Arrow 作为中间步骤来使用 Delta Lake 与 DuckDB。
让我们看一个示例。
首先导入 datafusion
和 deltalake
:
from datafusion import SessionContext
from deltalake import write_deltalake, DeltaTable
初始化 Datafusion 会话上下文:
ctx = SessionContext()
现在加载现有的 Delta 表,例如我们之前使用 pandas
创建的表:
table = DeltaTable("tmp/pandas-table/")
将此 Delta 表转换为 PyArrow 数据集并将其注册为 Datafusion 表:
arrow_data = table.to_pyarrow_dataset()
ctx.register_dataset("my_delta_table", arrow_data)
现在您可以查询您的 Datafusion 数据集:
> query = "select age from my_delta_table order by 1 desc"
> ctx.sql(query)
DataFrame()
+-----+
| age |
+-----+
| 68 |
| 51 |
| 47 |
| 33 |
| 23 |
+-----+
要将此数据写入 Delta 表,请首先将其转换为 Arrow 表。然后使用 write_deltalake
将其写入 Delta 表:
arrow_table = ctx.sql(query).to_arrow_table()
write_deltalake(
data=arrow_table,
table_or_uri="tmp/datafusion-table",
)
重新读取以确认:
> dt = DeltaTable("tmp/datafusion-table/")
> dt.to_pandas()
age
0 68
1 51
2 47
3 33
4 23
太棒了。
现在让我们更新查询以将其限制为仅 3 条记录:
query =
"select age from my_delta_table order by 1 desc limit 3"
并覆盖现有 Delta 表:
arrow_table = ctx.sql(query).to_arrow_table()
write_deltalake(
data=arrow_table,
table_or_uri="tmp/datafusion-table",
mode="overwrite",
)
重新读取以确认:
> dt = DeltaTable("tmp/datafusion-table/")
> dt.to_pandas()
age
0 68
1 51
2 47
您可以使用 version
关键字参数在 Delta 表的不同版本之间进行时间旅行:
> dt = DeltaTable("tmp/datafusion-table/", version=0)
> dt.to_pandas()
age
0 68
1 51
2 47
3 33
4 23
有关更多信息,请参阅 delta-rs 文档中的 Datafusion 集成页面。
无需 Spark 的 Delta Lake:结论
有许多方法可以在不使用 Spark 的情况下使用 Delta Lake。
专用的 Delta 连接器 允许您从 Flink、Hive、Trino、PrestoDB 等引擎使用 Delta Lake。
delta-rs 包允许您在 Rust 或 Python 中使用 Delta Lake,例如与 pandas、polars、Dask、Daft、DuckDB 等一起使用。
查看 示例笔记本,亲自运行 delta-rs 代码。