无需 Spark 的 Delta Lake
作者:Avril Aysha
这篇文章将向您展示如何在不使用 Spark 的情况下使用 Delta Lake。
您可能希望在不使用 Spark 的情况下使用 Delta Lake,原因如下:
- 您不想学习 Spark
- 您的团队不使用 Spark
- 您不想使用 Java 虚拟机 (JVM)
- 您正在处理相对较小的数据集
您可以使用许多其他语言(如 SQL、Python 和 Rust)在不使用 Spark 的情况下使用 Delta Lake。本文将向您展示在不使用 Spark 的情况下使用 Delta Lake 的最流行方式的示例。
让我们开始吧!🪂
如何在不使用 Spark 的情况下使用 Delta Lake
有许多方法可以在不使用 Spark 的情况下使用 Delta Lake。
为了清晰起见,我们将其分为两类:
- 专用的 Delta 连接器 允许您从 Flink、Hive、Trino、PrestoDB 等引擎使用 Delta Lake
- delta-rs 包允许您在 Rust 或 Python 中使用 Delta Lake,例如与 pandas、polars、Dask、Daft、DuckDB 等一起使用
本文将向您展示每种选项的简短代码示例,以便在不使用 Spark 的情况下使用 Delta Lake。您还可以在 Delta Lake 网站上找到完整的集成列表。
无需 Spark 的 Delta Lake:专用连接器
许多非 Spark 查询引擎都有专用连接器来使用 Delta Lake。这些都基于 Delta Standalone:一个用于 Java/Scala 的 JVM 库,可用于读取和写入 Delta 表。您可以使用 Delta Standalone 为未在集成页面上列出的服务构建自己的 Delta 连接器。
注意:如果您想完全避免 JVM,请参阅下面的 delta-rs 部分
您可以通过以下专用 Delta 连接器在不使用 Spark 的情况下使用 Delta Lake:
- Apache Flink
- Apache Hive
- PrestoDB
- Trino
- Amazon Athena
- Snowflake
- Google BigQuery
- Microsoft Fabric
其中一些连接器支持有限的 Delta Lake 功能。请务必查看每个连接器的“已知限制”部分以了解更多信息。
无需 Spark 的 Delta Lake:Apache Flink

您可以使用 Flink/Delta 连接器 从 Apache Flink 使用 Delta Lake。该连接器支持批处理和流式模式的数据写入。
连接器包括:
- DeltaSink用于将数据从 Apache Flink 写入 Delta 表。
- DeltaSource用于使用 Apache Flink 读取 Delta 表。
您可以将 Delta Lake 与 Flink Python 或 SQL API 一起使用。
Delta Lake 与 Flink:Python
下面的代码是如何使用分区列 surname 将数据写入分区表的示例。
    import io.delta.flink.sink.DeltaBucketAssigner;
    import io.delta.flink.sink.DeltaSinkBuilder;
    public DataStream<RowData> createDeltaSink(
            DataStream<RowData> stream,
            String deltaTablePath) {
        String[] partitionCols = { "surname" };
        DeltaSink<RowData> deltaSink = DeltaSink
            .forRowData(
                new Path(deltaTablePath),
                new Configuration(),
                rowType)
            .withPartitionColumns(partitionCols)
            .build();
        stream.sinkTo(deltaSink);
        return stream;
    }您还可以从 Apache Flink 使用 Delta Lake 的时间旅行功能。例如,像这样:
    public DataStream<RowData> createBoundedDeltaSourceWithTimeTravel(
            StreamExecutionEnvironment env,
            String deltaTablePath) {
        DeltaSource<RowData> deltaSource = DeltaSource
            .forBoundedRowData(
                new Path(deltaTablePath),
                new Configuration())
            // could also use `.versionAsOf(314159)`
            .timestampAsOf("2022-06-28 04:55:00")
            .build();
        return env.fromSource(deltaSource, WatermarkStrategy.noWatermarks(), "delta-source");
    }Delta Lake 与 Flink:SQL
从 Flink 3.0.0 版本开始,Delta 连接器可用于 Flink SQL 作业。Delta Source 和 Delta Sink 都可以用作 Flink 表,用于 SELECT 和 INSERT 查询。
例如,您可以使用以下方式将整个 Delta 表加载到另一个 Delta 表中:
    INSERT INTO sinkTable SELECT * FROM sourceTable;在上述 SQL 查询中,sourceTable 和 sinkTable 都指使用 Delta/Flink 连接器配置的 Delta 表。表模式必须匹配。
或者创建新的分区表:
    CREATE TABLE testTable (
        id BIGINT,
        data STRING,
        part_a STRING,
        part_b STRING
      )
      PARTITIONED BY (part_a, part_b);
      WITH (
        'connector' = 'delta',
        'table-path' = '<path-to-table>',
        '<arbitrary-user-define-table-property' = '<value>',
        '<delta.*-properties>' = '<value'>
    );您无需编写任何 Spark 即可将 Delta Lake 与 Apache Flink 一起使用。
请注意,Flink/Delta SQL 连接器必须与 Delta Catalog 一起使用。尝试在未配置 Delta Catalog 的情况下使用 Flink API 对 Delta 表执行 SQL 查询将导致 SQL 作业失败。
已知限制
- 目前仅支持 append写入操作;不支持overwrite或upsert。
- Azure Blob Storage 目前仅支持读取。由于类着色问题,Flink 不支持写入 Azure Blob Storage,请参见 此问题。
- 对于 AWS S3 存储,为了确保来自不同集群的并发事务写入,请使用 多集群配置指南。有关如何在 Flink Delta Sink 中使用此配置的示例,请参见 此示例。
- Delta SQL 连接器目前仅支持物理列。元数据和计算列目前不支持。有关详细信息,请参见 此处。
无需 Spark 的 Delta Lake:Apache Hive

您可以使用 Hive 连接器 从 Apache Hive 使用 Delta Lake。您可以使用此连接器查询 Hive 中的 Delta 表数据。您不能使用它将数据从 Hive 写入 Delta 表。
要使用 Delta Lake,您需要定义一个指向 Delta 表的外部 Hive 表,例如在 S3 上,像这样:
    CREATE EXTERNAL TABLE deltaTable(col1 INT, col2 STRING)
    STORED BY 'io.delta.hive.DeltaStorageHandler'
    LOCATION '/delta/table/path'CREATE TABLE 语句中的表模式应与您正在读取的 Delta 表的模式匹配。
定义外部 Hive 表后,您可以按如下方式查询它:
    select * from deltaTable;您无需编写任何 Spark 即可将 Delta Lake 与 Apache Hive 一起使用。
已知限制
- 此连接器是只读的。不支持任何写入操作。
- 仅支持外部 Hive 表。Delta 表必须在使用外部 Hive 表引用它之前使用 Spark 创建。
无需 Spark 的 Delta Lake:PrestoDB

您可以使用 PrestoDB 连接器 从 Presto 使用 Delta Lake。此连接器基于 Hive 连接器,并共享许多相同的配置选项。
您的 Delta 表需要注册到 Hive 元数据存储中。
您可以按如下方式从 S3 上现有的 Delta 表创建 Presto 表:
    CREATE TABLE sales.apac.sales_data_new (sampleColumn INT)
    WITH (external_location = 's3://db-sa-datasets/presto/sales_data_new');要在 Hive 元数据存储中注册表,您不需要传递表的完整模式,因为 Delta Lake 连接器从位于 Delta Lake 表位置的元数据中获取模式。为了避免 Hive 元数据存储中的 no columns error,请提供一个示例列作为正在注册的 Delta 表的模式。
要访问已作为 apac 数据库和 sales 目录的一部分注册到 Hive 元数据存储中的 Delta 表 sales_data,您可以简单地运行:
    SELECT * FROM sales.apac.sales_data LIMIT 200;您还可以通过传递 path 直接从 S3 查询 Delta 表:
    SELECT * FROM sales."$path$"."s3://db-sa-datasets/presto/sales_data" LIMIT 200;您可以通过将版本添加到表名来回溯到 Delta 表的特定版本,如下所示:
    SELECT * FROM sales.apac."sales_data@v4" LIMIT 200;已知限制
- 此连接器是只读的。不支持任何写入操作。
- 此连接器重用了 Hive 连接器中存在的许多模块,例如用于连接和安全性(如 S3、Azure Data Lake、AWS Glue 元数据存储等)。这些模块的配置与 Hive 连接器文档 中提供的配置相同。\
您无需编写任何 Spark 即可将 Delta Lake 与 PrestoDB 一起使用。
无需 Spark 的 Delta Lake:Trino

您可以使用 Trino 连接器 从 Trino 使用 Delta Lake。然后您可以使用 SQL 查询和转换您的 Delta 表。请注意,您的 Delta 表必须注册到元数据存储中,例如 Hive 元数据存储或 AWS Glue。
Trino 连接器支持读取和写入操作。您可以 追加、覆盖和合并 您的 Delta 表。
例如,您可以创建 Delta Lake 表,添加一些数据,修改一些数据,然后添加更多数据,像这样:
    CREATE TABLE users(id int, name varchar) WITH (column_mapping_mode = 'name');
    INSERT INTO users VALUES (1, 'Alice'), (2, 'Bob'), (3, 'Mallory');
    ALTER TABLE users DROP COLUMN name;
    INSERT INTO users VALUES 4;使用以下语句查看表中的所有数据:
    > SELECT * FROM users ORDER BY id;
    id
    ----
      1
      2
      3
      4使用 $history 元数据表查看过去操作的记录:
    > SELECT version, timestamp, operation
    > FROM "users$history";
    version |             timestamp              |  operation
    ---------+------------------------------------+--------------
           0 | 2024-04-10 17:49:18.528 Asia/Tokyo | CREATE TABLE
           1 | 2024-04-10 17:49:18.755 Asia/Tokyo | WRITE
           2 | 2024-04-10 17:49:18.929 Asia/Tokyo | DROP COLUMNS
           3 | 2024-04-10 17:49:19.137 Asia/Tokyo | WRITE然后回溯到版本 1:
    > SELECT *
    > FROM users FOR VERSION AS OF 1;
    id |  name
    ----+---------
      1 | Alice
      2 | Bob
      3 | Mallory您无需编写任何 Spark 即可将 Delta Lake 与 Trino 一起使用。
写入云存储
- 默认情况下,写入 Azure ADLS Gen2 和 Google Cloud Storage 是启用的。Trino 在从多个 Trino 集群或其他查询引擎写入时,会检测这些存储系统上的写入冲突。
- 写入 Amazon S3 和兼容 S3 的存储必须使用 delta.enable-non-concurrent-writes属性启用。从多个 Trino 集群写入 S3 是安全的;但是,当从其他 Delta Lake 引擎并发写入时,不会检测到写入冲突。您必须确保没有并发数据修改运行,以避免数据损坏。
数据类型映射
- Trino 和 Delta Lake 各自支持对方不支持的数据类型。因此,连接器在读取或写入数据时会修改某些类型。请参阅连接器文档中的 类型映射 以了解更多信息。
无需 Spark 的 Delta Lake:Amazon Athena

您可以使用 Athena 连接器 从 Amazon Athena 使用 Delta Lake。
请注意,您的 Delta Lake 表必须注册到 AWS Glue 元数据存储中。
如果您的表在 Amazon S3 中但不在 AWS Glue 中,请首先运行 CREATE EXTERNAL TABLE 语句:
    CREATE EXTERNAL TABLE
      [db_name.]table_name
      LOCATION 's3://DOC-EXAMPLE-BUCKET/your-folder/'
      TBLPROPERTIES ('table_type' = 'DELTA')Delta Lake 表元数据从 Delta Lake 事务日志推断并直接同步到 AWS Glue。您无需提供列或模式定义。
然后您可以使用标准 SQL 语法查询您的 Delta 表。例如:
    SELECT * FROM delta_table_users ORDER BY id;您无需编写任何 Spark 即可将 Delta Lake 与 Amazon Athena 一起使用。
已知限制
无需 Spark 的 Delta Lake:Snowflake

您可以使用 Snowflake 连接器 从 Snowflake 使用 Delta Lake。
您需要创建一个指向存储在云存储中的 Delta Lake 的 Snowflake 外部表。支持的云存储服务包括:Amazon S3、Google Cloud Storage 和 Microsoft Azure。
然后您可以使用标准 SQL 语法查询您的 Delta 表。
例如,您可以按如下方式创建由 Delta Lake 支持的外部表:
    CREATE EXTERNAL TABLE twitter_feed(
     PARTITION BY (date_part)
     LOCATION=@mystage/daily/
     FILE_FORMAT = (TYPE = PARQUET)
     TABLE_FORMAT = DELTA;请注意 FILE_FORMAT = (TYPE = PARQUET) 和 TABLE_FORMAT = DELTA。这些值必须这样设置。
为了获得最佳性能,建议为外部表定义分区列。在此示例中,我们已将 date_part 定义为分区列。
您无需编写任何 Spark 即可将 Delta Lake 与 Snowflake 一起使用。
已知限制
- 此连接器目前是预览功能。
- 对于引用 Delta Lake 文件的外部表,无法自动刷新元数据。相反,请定期执行 ALTER EXTERNAL TABLE … REFRESH 语句以注册任何添加或删除的文件。
- 在引用 Delta Lake 时,不支持以下 Snowflake 参数:- AWS_SNS_TOPIC = 'string'
- PATTERN = 'regex_pattern'
 
无需 Spark 的 Delta Lake:Google BigQuery

您可以使用 BigQuery 连接器 从 Google BigQuery 使用 Delta Lake。
您需要将现有 Delta 表定义为 BigQuery 中的外部表。这被称为 Delta Lake BigLake。
您可以按如下方式操作:
    CREATE EXTERNAL TABLE `PROJECT_ID.DATASET.DELTALAKE_TABLE_NAME`
    WITH CONNECTION `PROJECT_ID.REGION.CONNECTION_ID`
    OPTIONS (
      format ="DELTA_LAKE",
      uris=['DELTA_TABLE_GCS_BASE_PATH']);创建 Delta Lake BigLake 后,您可以使用 GoogleSQL 查询它。例如:
    SELECT field1, field2 FROM mydataset.my_cloud_storage_table;您无需编写任何 Spark 即可将 Delta Lake 与 Google BigQuery 一起使用。
在 专用博客文章 中阅读更多内容。
已知限制
- 此连接器作为预发布功能提供。预发布功能按“原样”提供,可能支持有限。
- 支持带有删除向量和列映射的 Delta Lake 读取器版本 3。
- 您必须在最后一个日志条目文件中列出读取器版本。例如,新表必须包含 00000..0.json。
- 不支持变更数据捕获 (CDC) 操作。任何现有 CDC 操作都将被忽略。
- 模式是自动检测的。不支持使用 BigQuery 修改模式。
- 表列名必须符合 BigQuery 列名限制。
- 不支持物化视图。
- 数据类型可能会根据 类型映射矩阵 进行转换。
无需 Spark 的 Delta Lake 与 delta-rs
delta-rs 库允许您在没有 Spark 或 Java 的情况下使用 Python 或 Rust 读取、写入和管理 Delta Lake 表。它在底层使用 Apache Arrow,因此与 pandas、DuckDB 和 Polars 等其他 Arrow 原生或集成库兼容。
使用 delta-rs 结合 Delta Lake 完全避免了 JVM。
delta-rs 有两个公共 API:
- “rust deltalake”指 delta-rs 的 Rust API
- “python deltalake”指 delta-rs 的 Python API
python deltalake API 允许您从许多开源查询引擎使用 Delta Lake,包括:
- pandas
- polars
- Dask
- Daft
- DuckDB
- Datafusion
有关更多信息,请参阅 delta-rs 文档中的 集成页面。
无需 Spark 的 Delta Lake:pandas

您可以使用 delta-rs 与 pandas 一起使用 Delta Lake。让我们看一个示例。
首先按如下方式导入 delta-rs 和 pandas:
    import pandas as pd
    from deltalake import write_deltalake, DeltaTable定义两个字典来存储一些数据:
    data = {'first_name': ['bob', 'li', 'leah'], 'age': [47, 23, 51]}
    data_2 = {"first_name": ["suh", "anais"], "age": [33, 68]}使用第一个字典创建一个 DataFrame 并将其写入 Delta Lake 表:
    df = pd.DataFrame.from_dict(data)
    write_deltalake("tmp/pandas-table", df)加载 Delta 表以检查结果:
    > DeltaTable("tmp/pandas-table/").to_pandas()
     first_name  age
    0        bob   47
    1         li   23
    2       leah   51让我们追加其余数据:
    df2 = pd.DataFrame(data_2)
    write_deltalake("tmp/pandas-table", df2, mode="append")重新读取以再次检查:
    > DeltaTable("tmp/pandas-table/").to_pandas()
     first_name  age
    0        bob   47
    1         li   23
    2       leah   51
    3        suh   33
    4      anais   68您可以使用 version 关键字回溯到以前的版本:
    > DeltaTable("tmp/pandas-table/", version=0).to_pandas()
     first_name  age
    0        bob   47
    1         li   23
    2       leah   51有关更多信息,请参阅 delta-rs 文档中的 Pandas 集成页面。
无需 Spark 的 Delta Lake:Polars

您可以使用 delta-rs 与 Polars 一起使用 Delta Lake。让我们看一个示例。
首先导入 polars:
    import polars as pl定义两个字典来存储一些数据:
    data = {'first_name': ['bob', 'li', 'leah'], 'age': [47, 23, 51]}
    data_2 = {"first_name": ["suh", "anais"], "age": [33, 68]}使用第一个字典创建一个 DataFrame 并将其写入 Delta Lake 表:
    df = pl.DataFrame(data)
    df.write_delta("tmp/polars_table")读取 Delta 表并打印 DataFrame 以进行可视化:
    > print(pl.read_delta("tmp/polars_table"))
    ┌────────────┬─────┐
    │ first_name ┆ age │
    │ ---        ┆ --- │
    │ str        ┆ i64 │
    ╞════════════╪═════╡
    │ bob        ┆ 47  │
    │ li         ┆ 23  │
    │ leah       ┆ 51  │
    └────────────┴─────┘使用第二个字典创建另一个 DataFrame 并将其追加到第一个:
    df = pl.DataFrame(data_2)
    df.write_delta("tmp/polars_table", mode="append")读取并可视化:
    > print(pl.read_delta("tmp/polars_table"))
    ┌────────────┬─────┐
    │ first_name ┆ age │
    │ ---        ┆ --- │
    │ str        ┆ i64 │
    ╞════════════╪═════╡
    │ suh        ┆ 33  │
    │ anais      ┆ 68  │
    │ bob        ┆ 47  │
    │ li         ┆ 23  │
    │ leah       ┆ 51  │
    └────────────┴─────┘使用时间旅行功能回溯到早期版本:
    > print(pl.read_delta("tmp/polars_table", version=0))
    ┌────────────┬─────┐
    │ first_name ┆ age │
    │ ---        ┆ --- │
    │ str        ┆ i64 │
    ╞════════════╪═════╡
    │ bob        ┆ 47  │
    │ li         ┆ 23  │
    │ leah       ┆ 51  │
    └────────────┴─────┘请注意,与 pandas 不同,polars 有自己的 read_delta 和 write_delta 方法。这意味着您无需显式导入 deltalake;它作为依赖项在 Polars 内部使用。
有关更多信息,请参阅 delta-rs 文档中的 Polars 集成页面。
无需 Spark 的 Delta Lake:Dask

您可以使用 delta-rs 与 Dask 一起使用 Delta Lake。此功能可通过 dask-deltatable 库获得。
请注意,dask-deltatable 仅适用于 deltalake==0.13.0
让我们看一个示例。
如果您正在运行 dask >= 2024.3.0,您将不得不禁用 Dask 的新查询规划器才能与 dask-deltatable 配合使用。您可以通过设置以下内容来完成:
    dask.config.set({'dataframe.query-planning': False})这仅在您在导入 dask-deltatable 之前设置配置时才有效。有关查询规划器的更多信息,请参阅 Dask 文档。
接下来,导入 dask-deltatable 和 dask.dataframe:
    import dask_deltatable as ddt
    import dask.dataframe as dd定义两个带有测试数据的字典:
    data = {'first_name': ['bob', 'li', 'leah'], 'age': [47, 23, 51]}
    data_2 = {"first_name": ["suh", "anais"], "age": [33, 68]}让我们从第一个字典创建一个 Dask DataFrame:
    > ddf = dd.from_dict(data, npartitions=1)
> ddf.compute()
     first_name  age
    0        bob   47
    1         li   23
    2       leah   51现在,将此 Dask DataFrame 写入 Delta 表:
    ddt.to_deltalake("tmp/dask-table", ddf)并重新读取以确认:
    > delta_path = "tmp/dask-table/"
    > ddf = ddt.read_deltalake(delta_path)
    > ddf.compute()
     first_name  age
    0        bob   47
    1         li   23
    2       leah   51让我们创建第二个 DataFrame,其中包含要追加到 Delta 表的数据:
    > ddf_2 = dd.from_dict(data_2, npartitions=1)
    > ddf_2.compute()
    	 first_name  age
    0        suh   33
    1      anais   68并以 append 模式执行写入以将其添加到现有 Delta 表中:
    ddt.to_deltalake("tmp/dask-table", ddf_2, mode="append")重新读取以确认:
    > delta_path = "tmp/dask-table/"
    > ddf = ddt.read_deltalake(delta_path)
    > ddf.compute()
     first_name  age
    0        bob   47
    1         li   23
    2       leah   51
    0        suh   33
    1      anais   68太棒了。
您还可以使用 version 关键字参数回溯到 Delta 表的早期版本:
    > delta_path = "tmp/dask-table/"
    > ddf = ddt.read_deltalake(delta_path, version=0)
    > print(ddf.compute())
     first_name  age
    0        bob   47
    1         li   23
    2       leah   51有关更多信息,请参阅 delta-rs 文档中的 Dask 集成页面。
无需 Spark 的 Delta Lake:Daft

您可以使用 delta-rs 与 Daft 一起使用 Delta Lake。
Daft 目前支持没有时间旅行的读取操作。写入操作可在 非公开预览 API 功能 中获得。
让我们看一个示例。
您可以按如下方式将现有 Delta 表读取到 Daft DataFrame 中:
    > df = daft.read_delta_lake("tmp/pandas-table")
    > df.collect()
    ╭────────────┬───────╮
    │ first_name ┆ age   │
    │ ---        ┆ ---   │
    │ Utf8       ┆ Int64 │
    ╞════════════╪═══════╡
    │ bob        ┆ 47    │
    ├╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌┤
    │ li         ┆ 23    │
    ├╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌┤
    │ leah       ┆ 51    │
    ├╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌┤
    │ suh        ┆ 33    │
    ├╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌┤
    │ anais      ┆ 68    │
    ╰────────────┴───────╯然后您可以查询或转换数据。例如:
    > df.where(df["age"] > 40).collect()
    ╭────────────┬───────╮
    │ first_name ┆ age   │
    │ ---        ┆ ---   │
    │ Utf8       ┆ Int64 │
    ╞════════════╪═══════╡
    │ bob        ┆ 47    │
    ├╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌┤
    │ leah       ┆ 51    │
    ├╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌┤
    │ anais      ┆ 68    │
    ╰────────────┴───────╯然后您可以使用 write_deltalake 将数据写入 Delta 表:
df.write_deltalake("tmp/daft-table", mode="overwrite")Daft 支持多种写入模式。
有关更多信息,请参阅 delta-rs 文档中的 Daft 集成页面。
无需 Spark 的 Delta Lake:DuckDB

您可以使用 delta-rs 与 DuckDB 一起使用 Delta Lake。
Delta Lake 表可以作为 Arrow 表和 Arrow 数据集 公开,这允许与各种查询引擎进行互操作。您可以使用 Arrow 作为中间步骤来使用 Delta Lake 与 DuckDB。
让我们看一个示例。
首先导入 duckdb 和 deltalake:
    import duckdb
    from deltalake import write_deltalake, DeltaTable现在加载现有的 Delta 表,例如我们之前使用 pandas 创建的表:
    dt = DeltaTable("tmp/pandas-table/")将此 Delta 表转换为 DuckDB 数据集,使用 Arrow 数据集作为中间步骤:
    arrow_data = dt.to_pyarrow_dataset()
    duck_data = duckdb.arrow(arrow_data)现在您可以查询您的 DuckDB 数据集:
    > query = """
    > select
    >  age
    > from duck_data
    > order by 1 desc
    > """
    > duckdb.query(query)
    ┌───────┐
    │  age  │
    │ int64 │
    ├───────┤
    │    68 │
    │    51 │
    │    47 │
    │    33 │
    │    23 │
    └───────┘要将此数据写入 Delta 表,请首先将其转换为 Arrow 表。然后使用 write_deltalake 将其写入 Delta 表:
    arrow_table = duckdb.query(query).to_arrow_table()
    write_deltalake(
        data=arrow_table,
        table_or_uri="tmp/duckdb-table",
        mode="overwrite",
    )重新读取以确认:
    > dt = DeltaTable("tmp/duckdb-table/")
    > dt.to_pandas()
      age
    0   68
    1   51
    2   47
    3   33
    4   23太棒了。
现在让我们更新查询以将其限制为仅 3 条记录:
    query = """
    select
      age
    from duck_data
    order by 1 desc
    limit 3
    """并覆盖现有 Delta 表:
    arrow_table = duckdb.query(query).to_arrow_table()
    write_deltalake(
        data=arrow_table,
        table_or_uri="tmp/duckdb-table",
        mode="overwrite",
    )重新读取以确认:
    > dt = DeltaTable("tmp/duckdb-table/")
    > dt.to_pandas()
      age
    0   68
    1   51
    2   47您可以使用 version 关键字参数在 Delta 表的不同版本之间进行时间旅行:
    > dt = DeltaTable("tmp/duckdb-table/", version=0)
    > dt.to_pandas()
      age
    0   68
    1   51
    2   47
    3   33
    4   23还有一个用于 Delta Lake 的实验性 DuckDB 扩展。此扩展由 DuckDB 维护,您可以在 Github 存储库 中阅读相关内容。该扩展目前仅支持读取操作。
无需 Spark 的 Delta Lake:Datafusion

您可以使用 delta-rs 与 Datafusion 一起使用 Delta Lake。您可以使用 Arrow 作为中间步骤来使用 Delta Lake 与 DuckDB。
让我们看一个示例。
首先导入 datafusion 和 deltalake:
    from datafusion import SessionContext
    from deltalake import write_deltalake, DeltaTable初始化 Datafusion 会话上下文:
    ctx = SessionContext()现在加载现有的 Delta 表,例如我们之前使用 pandas 创建的表:
    table = DeltaTable("tmp/pandas-table/")将此 Delta 表转换为 PyArrow 数据集并将其注册为 Datafusion 表:
    arrow_data = table.to_pyarrow_dataset()
    ctx.register_dataset("my_delta_table", arrow_data)现在您可以查询您的 Datafusion 数据集:
    > query = "select age from my_delta_table order by 1 desc"
    > ctx.sql(query)
    DataFrame()
    +-----+
    | age |
    +-----+
    | 68  |
    | 51  |
    | 47  |
    | 33  |
    | 23  |
    +-----+要将此数据写入 Delta 表,请首先将其转换为 Arrow 表。然后使用 write_deltalake 将其写入 Delta 表:
    arrow_table = ctx.sql(query).to_arrow_table()
    write_deltalake(
        data=arrow_table,
        table_or_uri="tmp/datafusion-table",
    )重新读取以确认:
    > dt = DeltaTable("tmp/datafusion-table/")
    > dt.to_pandas()
      age
    0   68
    1   51
    2   47
    3   33
    4   23太棒了。
现在让我们更新查询以将其限制为仅 3 条记录:
    query =
    "select age from my_delta_table order by 1 desc limit 3"并覆盖现有 Delta 表:
    arrow_table = ctx.sql(query).to_arrow_table()
    write_deltalake(
        data=arrow_table,
        table_or_uri="tmp/datafusion-table",
        mode="overwrite",
    )重新读取以确认:
    > dt = DeltaTable("tmp/datafusion-table/")
    > dt.to_pandas()
      age
    0   68
    1   51
    2   47您可以使用 version 关键字参数在 Delta 表的不同版本之间进行时间旅行:
    > dt = DeltaTable("tmp/datafusion-table/", version=0)
    > dt.to_pandas()
      age
    0   68
    1   51
    2   47
    3   33
    4   23有关更多信息,请参阅 delta-rs 文档中的 Datafusion 集成页面。
无需 Spark 的 Delta Lake:结论
有许多方法可以在不使用 Spark 的情况下使用 Delta Lake。
专用的 Delta 连接器 允许您从 Flink、Hive、Trino、PrestoDB 等引擎使用 Delta Lake。
delta-rs 包允许您在 Rust 或 Python 中使用 Delta Lake,例如与 pandas、polars、Dask、Daft、DuckDB 等一起使用。
查看 示例笔记本,亲自运行 delta-rs 代码。