The Linux Foundation Projects
Delta Lake

无需 Spark 的 Delta Lake

作者:Avril Aysha

这篇文章将向您展示如何在不使用 Spark 的情况下使用 Delta Lake。

您可能希望在不使用 Spark 的情况下使用 Delta Lake,原因如下:

  • 您不想学习 Spark
  • 您的团队不使用 Spark
  • 您不想使用 Java 虚拟机 (JVM)
  • 您正在处理相对较小的数据集

您可以使用许多其他语言(如 SQL、Python 和 Rust)在不使用 Spark 的情况下使用 Delta Lake。本文将向您展示在不使用 Spark 的情况下使用 Delta Lake 的最流行方式的示例。

让我们开始吧!🪂

如何在不使用 Spark 的情况下使用 Delta Lake

有许多方法可以在不使用 Spark 的情况下使用 Delta Lake。

为了清晰起见,我们将其分为两类:

  • 专用的 Delta 连接器 允许您从 Flink、Hive、Trino、PrestoDB 等引擎使用 Delta Lake
  • delta-rs 包允许您在 Rust 或 Python 中使用 Delta Lake,例如与 pandas、polars、Dask、Daft、DuckDB 等一起使用

本文将向您展示每种选项的简短代码示例,以便在不使用 Spark 的情况下使用 Delta Lake。您还可以在 Delta Lake 网站上找到完整的集成列表

无需 Spark 的 Delta Lake:专用连接器

许多非 Spark 查询引擎都有专用连接器来使用 Delta Lake。这些都基于 Delta Standalone:一个用于 Java/Scala 的 JVM 库,可用于读取和写入 Delta 表。您可以使用 Delta Standalone 为未在集成页面上列出的服务构建自己的 Delta 连接器。

注意:如果您想完全避免 JVM,请参阅下面的 delta-rs 部分

您可以通过以下专用 Delta 连接器在不使用 Spark 的情况下使用 Delta Lake:

  • Apache Flink
  • Apache Hive
  • PrestoDB
  • Trino
  • Amazon Athena
  • Snowflake
  • Google BigQuery
  • Microsoft Fabric

其中一些连接器支持有限的 Delta Lake 功能。请务必查看每个连接器的“已知限制”部分以了解更多信息。

您可以使用 Flink/Delta 连接器 从 Apache Flink 使用 Delta Lake。该连接器支持批处理和流式模式的数据写入。

连接器包括:

  • DeltaSink 用于将数据从 Apache Flink 写入 Delta 表。
  • DeltaSource 用于使用 Apache Flink 读取 Delta 表。

您可以将 Delta Lake 与 Flink Python 或 SQL API 一起使用。

下面的代码是如何使用分区列 surname 将数据写入分区表的示例。

    import io.delta.flink.sink.DeltaBucketAssigner;
    import io.delta.flink.sink.DeltaSinkBuilder;

    public DataStream<RowData> createDeltaSink(
            DataStream<RowData> stream,
            String deltaTablePath) {
        String[] partitionCols = { "surname" };
        DeltaSink<RowData> deltaSink = DeltaSink
            .forRowData(
                new Path(deltaTablePath),
                new Configuration(),
                rowType)
            .withPartitionColumns(partitionCols)
            .build();
        stream.sinkTo(deltaSink);
        return stream;
    }

您还可以从 Apache Flink 使用 Delta Lake 的时间旅行功能。例如,像这样:

    public DataStream<RowData> createBoundedDeltaSourceWithTimeTravel(
            StreamExecutionEnvironment env,
            String deltaTablePath) {

        DeltaSource<RowData> deltaSource = DeltaSource
            .forBoundedRowData(
                new Path(deltaTablePath),
                new Configuration())
            // could also use `.versionAsOf(314159)`
            .timestampAsOf("2022-06-28 04:55:00")
            .build();

        return env.fromSource(deltaSource, WatermarkStrategy.noWatermarks(), "delta-source");
    }

从 Flink 3.0.0 版本开始,Delta 连接器可用于 Flink SQL 作业。Delta Source 和 Delta Sink 都可以用作 Flink 表,用于 SELECT 和 INSERT 查询。

例如,您可以使用以下方式将整个 Delta 表加载到另一个 Delta 表中:

    INSERT INTO sinkTable SELECT * FROM sourceTable;

在上述 SQL 查询中,sourceTablesinkTable 都指使用 Delta/Flink 连接器配置的 Delta 表。表模式必须匹配。

或者创建新的分区表:

    CREATE TABLE testTable (
        id BIGINT,
        data STRING,
        part_a STRING,
        part_b STRING
      )
      PARTITIONED BY (part_a, part_b);
      WITH (
        'connector' = 'delta',
        'table-path' = '<path-to-table>',
        '<arbitrary-user-define-table-property' = '<value>',
        '<delta.*-properties>' = '<value'>
    );

您无需编写任何 Spark 即可将 Delta Lake 与 Apache Flink 一起使用。

请注意,Flink/Delta SQL 连接器必须与 Delta Catalog 一起使用。尝试在未配置 Delta Catalog 的情况下使用 Flink API 对 Delta 表执行 SQL 查询将导致 SQL 作业失败。

已知限制

  • 目前仅支持 append 写入操作;不支持 overwriteupsert
  • Azure Blob Storage 目前仅支持读取。由于类着色问题,Flink 不支持写入 Azure Blob Storage,请参见 此问题
  • 对于 AWS S3 存储,为了确保来自不同集群的并发事务写入,请使用 多集群配置指南。有关如何在 Flink Delta Sink 中使用此配置的示例,请参见 此示例
  • Delta SQL 连接器目前仅支持物理列。元数据和计算列目前不支持。有关详细信息,请参见 此处

无需 Spark 的 Delta Lake:Apache Hive

您可以使用 Hive 连接器 从 Apache Hive 使用 Delta Lake。您可以使用此连接器查询 Hive 中的 Delta 表数据。您不能使用它将数据从 Hive 写入 Delta 表。

要使用 Delta Lake,您需要定义一个指向 Delta 表的外部 Hive 表,例如在 S3 上,像这样:

    CREATE EXTERNAL TABLE deltaTable(col1 INT, col2 STRING)
    STORED BY 'io.delta.hive.DeltaStorageHandler'
    LOCATION '/delta/table/path'

CREATE TABLE 语句中的表模式应与您正在读取的 Delta 表的模式匹配。

定义外部 Hive 表后,您可以按如下方式查询它:

    select * from deltaTable;

您无需编写任何 Spark 即可将 Delta Lake 与 Apache Hive 一起使用。

已知限制

  • 此连接器是只读的。不支持任何写入操作。
  • 仅支持外部 Hive 表。Delta 表必须在使用外部 Hive 表引用它之前使用 Spark 创建。

无需 Spark 的 Delta Lake:PrestoDB

您可以使用 PrestoDB 连接器 从 Presto 使用 Delta Lake。此连接器基于 Hive 连接器,并共享许多相同的配置选项。

您的 Delta 表需要注册到 Hive 元数据存储中。

您可以按如下方式从 S3 上现有的 Delta 表创建 Presto 表:

    CREATE TABLE sales.apac.sales_data_new (sampleColumn INT)
    WITH (external_location = 's3://db-sa-datasets/presto/sales_data_new');

要在 Hive 元数据存储中注册表,您不需要传递表的完整模式,因为 Delta Lake 连接器从位于 Delta Lake 表位置的元数据中获取模式。为了避免 Hive 元数据存储中的 no columns error,请提供一个示例列作为正在注册的 Delta 表的模式。

要访问已作为 apac 数据库和 sales 目录的一部分注册到 Hive 元数据存储中的 Delta 表 sales_data,您可以简单地运行:

    SELECT * FROM sales.apac.sales_data LIMIT 200;

您还可以通过传递 path 直接从 S3 查询 Delta 表:

    SELECT * FROM sales."$path$"."s3://db-sa-datasets/presto/sales_data" LIMIT 200;

您可以通过将版本添加到表名来回溯到 Delta 表的特定版本,如下所示:

    SELECT * FROM sales.apac."sales_data@v4" LIMIT 200;

已知限制

  • 此连接器是只读的。不支持任何写入操作。
  • 此连接器重用了 Hive 连接器中存在的许多模块,例如用于连接和安全性(如 S3、Azure Data Lake、AWS Glue 元数据存储等)。这些模块的配置与 Hive 连接器文档 中提供的配置相同。\

您无需编写任何 Spark 即可将 Delta Lake 与 PrestoDB 一起使用。

无需 Spark 的 Delta Lake:Trino

您可以使用 Trino 连接器 从 Trino 使用 Delta Lake。然后您可以使用 SQL 查询和转换您的 Delta 表。请注意,您的 Delta 表必须注册到元数据存储中,例如 Hive 元数据存储或 AWS Glue。

Trino 连接器支持读取和写入操作。您可以 追加、覆盖和合并 您的 Delta 表。

例如,您可以创建 Delta Lake 表,添加一些数据,修改一些数据,然后添加更多数据,像这样:

    CREATE TABLE users(id int, name varchar) WITH (column_mapping_mode = 'name');
    INSERT INTO users VALUES (1, 'Alice'), (2, 'Bob'), (3, 'Mallory');
    ALTER TABLE users DROP COLUMN name;
    INSERT INTO users VALUES 4;

使用以下语句查看表中的所有数据:

    > SELECT * FROM users ORDER BY id;

    id
    ----
      1
      2
      3
      4

使用 $history 元数据表查看过去操作的记录:

    > SELECT version, timestamp, operation
    > FROM "users$history";

    version |             timestamp              |  operation
    ---------+------------------------------------+--------------
           0 | 2024-04-10 17:49:18.528 Asia/Tokyo | CREATE TABLE
           1 | 2024-04-10 17:49:18.755 Asia/Tokyo | WRITE
           2 | 2024-04-10 17:49:18.929 Asia/Tokyo | DROP COLUMNS
           3 | 2024-04-10 17:49:19.137 Asia/Tokyo | WRITE

然后回溯到版本 1:

    > SELECT *
    > FROM users FOR VERSION AS OF 1;

    id |  name
    ----+---------
      1 | Alice
      2 | Bob
      3 | Mallory

您无需编写任何 Spark 即可将 Delta Lake 与 Trino 一起使用。

写入云存储

  • 默认情况下,写入 Azure ADLS Gen2 和 Google Cloud Storage 是启用的。Trino 在从多个 Trino 集群或其他查询引擎写入时,会检测这些存储系统上的写入冲突。
  • 写入 Amazon S3 和兼容 S3 的存储必须使用 delta.enable-non-concurrent-writes 属性启用。从多个 Trino 集群写入 S3 是安全的;但是,当从其他 Delta Lake 引擎并发写入时,不会检测到写入冲突。您必须确保没有并发数据修改运行,以避免数据损坏。

数据类型映射

  • Trino 和 Delta Lake 各自支持对方不支持的数据类型。因此,连接器在读取或写入数据时会修改某些类型。请参阅连接器文档中的 类型映射 以了解更多信息。

无需 Spark 的 Delta Lake:Amazon Athena

您可以使用 Athena 连接器 从 Amazon Athena 使用 Delta Lake。

请注意,您的 Delta Lake 表必须注册到 AWS Glue 元数据存储中。

如果您的表在 Amazon S3 中但不在 AWS Glue 中,请首先运行 CREATE EXTERNAL TABLE 语句:

    CREATE EXTERNAL TABLE
      [db_name.]table_name
      LOCATION 's3://DOC-EXAMPLE-BUCKET/your-folder/'
      TBLPROPERTIES ('table_type' = 'DELTA')

Delta Lake 表元数据从 Delta Lake 事务日志推断并直接同步到 AWS Glue。您无需提供列或模式定义。

然后您可以使用标准 SQL 语法查询您的 Delta 表。例如:

    SELECT * FROM delta_table_users ORDER BY id;

您无需编写任何 Spark 即可将 Delta Lake 与 Amazon Athena 一起使用。

已知限制

  • 此连接器是只读的。不支持任何写入操作或时间旅行。
  • 有关使用 Athena 和 AWS Glue 处理 UPSERT 的示例,请查看 这篇文章
  • 只有某些数据类型可用于分区列,请参阅 文档

无需 Spark 的 Delta Lake:Snowflake

您可以使用 Snowflake 连接器 从 Snowflake 使用 Delta Lake。

您需要创建一个指向存储在云存储中的 Delta Lake 的 Snowflake 外部表。支持的云存储服务包括:Amazon S3、Google Cloud Storage 和 Microsoft Azure。

然后您可以使用标准 SQL 语法查询您的 Delta 表。

例如,您可以按如下方式创建由 Delta Lake 支持的外部表:

    CREATE EXTERNAL TABLE twitter_feed(
     PARTITION BY (date_part)
     LOCATION=@mystage/daily/
     FILE_FORMAT = (TYPE = PARQUET)
     TABLE_FORMAT = DELTA;

请注意 FILE_FORMAT = (TYPE = PARQUET)TABLE_FORMAT = DELTA。这些值必须这样设置。

为了获得最佳性能,建议为外部表定义分区列。在此示例中,我们已将 date_part 定义为分区列。

您无需编写任何 Spark 即可将 Delta Lake 与 Snowflake 一起使用。

已知限制

  • 此连接器目前是预览功能。
  • 对于引用 Delta Lake 文件的外部表,无法自动刷新元数据。相反,请定期执行 ALTER EXTERNAL TABLE … REFRESH 语句以注册任何添加或删除的文件。
  • 在引用 Delta Lake 时,不支持以下 Snowflake 参数:
    • AWS_SNS_TOPIC = 'string'
    • PATTERN = 'regex_pattern'

无需 Spark 的 Delta Lake:Google BigQuery

您可以使用 BigQuery 连接器 从 Google BigQuery 使用 Delta Lake。

您需要将现有 Delta 表定义为 BigQuery 中的外部表。这被称为 Delta Lake BigLake。

您可以按如下方式操作:

    CREATE EXTERNAL TABLE `PROJECT_ID.DATASET.DELTALAKE_TABLE_NAME`
    WITH CONNECTION `PROJECT_ID.REGION.CONNECTION_ID`
    OPTIONS (
      format ="DELTA_LAKE",
      uris=['DELTA_TABLE_GCS_BASE_PATH']);

创建 Delta Lake BigLake 后,您可以使用 GoogleSQL 查询它。例如:

    SELECT field1, field2 FROM mydataset.my_cloud_storage_table;

您无需编写任何 Spark 即可将 Delta Lake 与 Google BigQuery 一起使用。

专用博客文章 中阅读更多内容。

已知限制

  • 此连接器作为预发布功能提供。预发布功能按“原样”提供,可能支持有限。
  • 支持带有删除向量和列映射的 Delta Lake 读取器版本 3。
  • 您必须在最后一个日志条目文件中列出读取器版本。例如,新表必须包含 00000..0.json
  • 不支持变更数据捕获 (CDC) 操作。任何现有 CDC 操作都将被忽略。
  • 模式是自动检测的。不支持使用 BigQuery 修改模式。
  • 表列名必须符合 BigQuery 列名限制
  • 不支持物化视图。
  • 数据类型可能会根据 类型映射矩阵 进行转换。

无需 Spark 的 Delta Lake 与 delta-rs

delta-rs 库允许您在没有 Spark 或 Java 的情况下使用 Python 或 Rust 读取、写入和管理 Delta Lake 表。它在底层使用 Apache Arrow,因此与 pandasDuckDBPolars 等其他 Arrow 原生或集成库兼容。

使用 delta-rs 结合 Delta Lake 完全避免了 JVM。

delta-rs 有两个公共 API:

  1. “rust deltalake”指 delta-rs 的 Rust API
  2. “python deltalake”指 delta-rs 的 Python API

python deltalake API 允许您从许多开源查询引擎使用 Delta Lake,包括:

  • pandas
  • polars
  • Dask
  • Daft
  • DuckDB
  • Datafusion

有关更多信息,请参阅 delta-rs 文档中的 集成页面

无需 Spark 的 Delta Lake:pandas

您可以使用 delta-rs 与 pandas 一起使用 Delta Lake。让我们看一个示例。

首先按如下方式导入 delta-rs 和 pandas:

    import pandas as pd
    from deltalake import write_deltalake, DeltaTable

定义两个字典来存储一些数据:

    data = {'first_name': ['bob', 'li', 'leah'], 'age': [47, 23, 51]}
    data_2 = {"first_name": ["suh", "anais"], "age": [33, 68]}

使用第一个字典创建一个 DataFrame 并将其写入 Delta Lake 表:

    df = pd.DataFrame.from_dict(data)
    write_deltalake("tmp/pandas-table", df)

加载 Delta 表以检查结果:

    > DeltaTable("tmp/pandas-table/").to_pandas()

     first_name  age
    0        bob   47
    1         li   23
    2       leah   51

让我们追加其余数据:

    df2 = pd.DataFrame(data_2)
    write_deltalake("tmp/pandas-table", df2, mode="append")

重新读取以再次检查:

    > DeltaTable("tmp/pandas-table/").to_pandas()

     first_name  age
    0        bob   47
    1         li   23
    2       leah   51
    3        suh   33
    4      anais   68

您可以使用 version 关键字回溯到以前的版本:

    > DeltaTable("tmp/pandas-table/", version=0).to_pandas()

     first_name  age
    0        bob   47
    1         li   23
    2       leah   51

有关更多信息,请参阅 delta-rs 文档中的 Pandas 集成页面

无需 Spark 的 Delta Lake:Polars

您可以使用 delta-rs 与 Polars 一起使用 Delta Lake。让我们看一个示例。

首先导入 polars:

    import polars as pl

定义两个字典来存储一些数据:

    data = {'first_name': ['bob', 'li', 'leah'], 'age': [47, 23, 51]}
    data_2 = {"first_name": ["suh", "anais"], "age": [33, 68]}

使用第一个字典创建一个 DataFrame 并将其写入 Delta Lake 表:

    df = pl.DataFrame(data)
    df.write_delta("tmp/polars_table")

读取 Delta 表并打印 DataFrame 以进行可视化:

    > print(pl.read_delta("tmp/polars_table"))

    ┌────────────┬─────┐
    │ first_name ┆ age │
    │ ---        ┆ --- │
    │ str        ┆ i64 │
    ╞════════════╪═════╡
    │ bob        ┆ 47  │
    │ li         ┆ 23  │
    │ leah       ┆ 51  │
    └────────────┴─────┘

使用第二个字典创建另一个 DataFrame 并将其追加到第一个:

    df = pl.DataFrame(data_2)
    df.write_delta("tmp/polars_table", mode="append")

读取并可视化:

    > print(pl.read_delta("tmp/polars_table"))

    ┌────────────┬─────┐
    │ first_name ┆ age │
    │ ---        ┆ --- │
    │ str        ┆ i64 │
    ╞════════════╪═════╡
    │ suh        ┆ 33  │
    │ anais      ┆ 68  │
    │ bob        ┆ 47  │
    │ li         ┆ 23  │
    │ leah       ┆ 51  │
    └────────────┴─────┘

使用时间旅行功能回溯到早期版本:

    > print(pl.read_delta("tmp/polars_table", version=0))

    ┌────────────┬─────┐
    │ first_name ┆ age │
    │ ---        ┆ --- │
    │ str        ┆ i64 │
    ╞════════════╪═════╡
    │ bob        ┆ 47  │
    │ li         ┆ 23  │
    │ leah       ┆ 51  │
    └────────────┴─────┘

请注意,与 pandas 不同,polars 有自己的 read_deltawrite_delta 方法。这意味着您无需显式导入 deltalake;它作为依赖项在 Polars 内部使用。

有关更多信息,请参阅 delta-rs 文档中的 Polars 集成页面

无需 Spark 的 Delta Lake:Dask

您可以使用 delta-rs 与 Dask 一起使用 Delta Lake。此功能可通过 dask-deltatable 库获得。

请注意,dask-deltatable 仅适用于 deltalake==0.13.0

让我们看一个示例。

如果您正在运行 dask >= 2024.3.0,您将不得不禁用 Dask 的新查询规划器才能与 dask-deltatable 配合使用。您可以通过设置以下内容来完成:

    dask.config.set({'dataframe.query-planning': False})

这仅在您在导入 dask-deltatable 之前设置配置时才有效。有关查询规划器的更多信息,请参阅 Dask 文档

接下来,导入 dask-deltatabledask.dataframe

    import dask_deltatable as ddt
    import dask.dataframe as dd

定义两个带有测试数据的字典:

    data = {'first_name': ['bob', 'li', 'leah'], 'age': [47, 23, 51]}
    data_2 = {"first_name": ["suh", "anais"], "age": [33, 68]}

让我们从第一个字典创建一个 Dask DataFrame:

    > ddf = dd.from_dict(data, npartitions=1)
> ddf.compute()

     first_name  age
    0        bob   47
    1         li   23
    2       leah   51

现在,将此 Dask DataFrame 写入 Delta 表:

    ddt.to_deltalake("tmp/dask-table", ddf)

并重新读取以确认:

    > delta_path = "tmp/dask-table/"
    > ddf = ddt.read_deltalake(delta_path)
    > ddf.compute()

     first_name  age
    0        bob   47
    1         li   23
    2       leah   51

让我们创建第二个 DataFrame,其中包含要追加到 Delta 表的数据:

    > ddf_2 = dd.from_dict(data_2, npartitions=1)
    > ddf_2.compute()

    	 first_name  age
    0        suh   33
    1      anais   68

并以 append 模式执行写入以将其添加到现有 Delta 表中:

    ddt.to_deltalake("tmp/dask-table", ddf_2, mode="append")

重新读取以确认:

    > delta_path = "tmp/dask-table/"
    > ddf = ddt.read_deltalake(delta_path)
    > ddf.compute()

     first_name  age
    0        bob   47
    1         li   23
    2       leah   51
    0        suh   33
    1      anais   68

太棒了。

您还可以使用 version 关键字参数回溯到 Delta 表的早期版本:

    > delta_path = "tmp/dask-table/"
    > ddf = ddt.read_deltalake(delta_path, version=0)
    > print(ddf.compute())

     first_name  age
    0        bob   47
    1         li   23
    2       leah   51

有关更多信息,请参阅 delta-rs 文档中的 Dask 集成页面

无需 Spark 的 Delta Lake:Daft

您可以使用 delta-rs 与 Daft 一起使用 Delta Lake。

Daft 目前支持没有时间旅行的读取操作。写入操作可在 非公开预览 API 功能 中获得。

让我们看一个示例。

您可以按如下方式将现有 Delta 表读取到 Daft DataFrame 中:

    > df = daft.read_delta_lake("tmp/pandas-table")
    > df.collect()

    ╭────────────┬───────╮
    │ first_name ┆ age   │
    │ ---        ┆ ---   │
    │ Utf8       ┆ Int64 │
    ╞════════════╪═══════╡
    │ bob        ┆ 47    │
    ├╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌┤
    │ li         ┆ 23    │
    ├╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌┤
    │ leah       ┆ 51    │
    ├╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌┤
    │ suh        ┆ 33    │
    ├╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌┤
    │ anais      ┆ 68    │
    ╰────────────┴───────╯

然后您可以查询或转换数据。例如:

    > df.where(df["age"] > 40).collect()

    ╭────────────┬───────╮
    │ first_name ┆ age   │
    │ ---        ┆ ---   │
    │ Utf8       ┆ Int64 │
    ╞════════════╪═══════╡
    │ bob        ┆ 47    │
    ├╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌┤
    │ leah       ┆ 51    │
    ├╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌┤
    │ anais      ┆ 68    │
    ╰────────────┴───────╯

然后您可以使用 write_deltalake 将数据写入 Delta 表:

df.write_deltalake("tmp/daft-table", mode="overwrite")

Daft 支持多种写入模式。

有关更多信息,请参阅 delta-rs 文档中的 Daft 集成页面

无需 Spark 的 Delta Lake:DuckDB

您可以使用 delta-rs 与 DuckDB 一起使用 Delta Lake。

Delta Lake 表可以作为 Arrow 表和 Arrow 数据集 公开,这允许与各种查询引擎进行互操作。您可以使用 Arrow 作为中间步骤来使用 Delta Lake 与 DuckDB。

让我们看一个示例。

首先导入 duckdbdeltalake

    import duckdb
    from deltalake import write_deltalake, DeltaTable

现在加载现有的 Delta 表,例如我们之前使用 pandas 创建的表:

    dt = DeltaTable("tmp/pandas-table/")

将此 Delta 表转换为 DuckDB 数据集,使用 Arrow 数据集作为中间步骤:

    arrow_data = dt.to_pyarrow_dataset()
    duck_data = duckdb.arrow(arrow_data)

现在您可以查询您的 DuckDB 数据集:

    > query = """
    > select
    >  age
    > from duck_data
    > order by 1 desc
    > """

    > duckdb.query(query)

    ┌───────┐
    │  age  │
    │ int64 │
    ├───────┤
    │    68 │
    │    51 │
    │    47 │
    │    33 │
    │    23 │
    └───────┘

要将此数据写入 Delta 表,请首先将其转换为 Arrow 表。然后使用 write_deltalake 将其写入 Delta 表:

    arrow_table = duckdb.query(query).to_arrow_table()

    write_deltalake(
        data=arrow_table,
        table_or_uri="tmp/duckdb-table",
        mode="overwrite",
    )

重新读取以确认:

    > dt = DeltaTable("tmp/duckdb-table/")
    > dt.to_pandas()

      age
    0   68
    1   51
    2   47
    3   33
    4   23

太棒了。

现在让我们更新查询以将其限制为仅 3 条记录:

    query = """
    select
      age
    from duck_data
    order by 1 desc
    limit 3
    """

并覆盖现有 Delta 表:

    arrow_table = duckdb.query(query).to_arrow_table()

    write_deltalake(
        data=arrow_table,
        table_or_uri="tmp/duckdb-table",
        mode="overwrite",
    )

重新读取以确认:

    > dt = DeltaTable("tmp/duckdb-table/")
    > dt.to_pandas()

      age
    0   68
    1   51
    2   47

您可以使用 version 关键字参数在 Delta 表的不同版本之间进行时间旅行:

    > dt = DeltaTable("tmp/duckdb-table/", version=0)
    > dt.to_pandas()

      age
    0   68
    1   51
    2   47
    3   33
    4   23

还有一个用于 Delta Lake 的实验性 DuckDB 扩展。此扩展由 DuckDB 维护,您可以在 Github 存储库 中阅读相关内容。该扩展目前仅支持读取操作。

无需 Spark 的 Delta Lake:Datafusion

您可以使用 delta-rs 与 Datafusion 一起使用 Delta Lake。您可以使用 Arrow 作为中间步骤来使用 Delta Lake 与 DuckDB。

让我们看一个示例。

首先导入 datafusiondeltalake

    from datafusion import SessionContext
    from deltalake import write_deltalake, DeltaTable

初始化 Datafusion 会话上下文:

    ctx = SessionContext()

现在加载现有的 Delta 表,例如我们之前使用 pandas 创建的表:

    table = DeltaTable("tmp/pandas-table/")

将此 Delta 表转换为 PyArrow 数据集并将其注册为 Datafusion 表:

    arrow_data = table.to_pyarrow_dataset()
    ctx.register_dataset("my_delta_table", arrow_data)

现在您可以查询您的 Datafusion 数据集:

    > query = "select age from my_delta_table order by 1 desc"
    > ctx.sql(query)

    DataFrame()
    +-----+
    | age |
    +-----+
    | 68  |
    | 51  |
    | 47  |
    | 33  |
    | 23  |
    +-----+

要将此数据写入 Delta 表,请首先将其转换为 Arrow 表。然后使用 write_deltalake 将其写入 Delta 表:

    arrow_table = ctx.sql(query).to_arrow_table()

    write_deltalake(
        data=arrow_table,
        table_or_uri="tmp/datafusion-table",
    )

重新读取以确认:

    > dt = DeltaTable("tmp/datafusion-table/")
    > dt.to_pandas()

      age
    0   68
    1   51
    2   47
    3   33
    4   23

太棒了。

现在让我们更新查询以将其限制为仅 3 条记录:

    query =
    "select age from my_delta_table order by 1 desc limit 3"

并覆盖现有 Delta 表:

    arrow_table = ctx.sql(query).to_arrow_table()

    write_deltalake(
        data=arrow_table,
        table_or_uri="tmp/datafusion-table",
        mode="overwrite",
    )

重新读取以确认:

    > dt = DeltaTable("tmp/datafusion-table/")
    > dt.to_pandas()

      age
    0   68
    1   51
    2   47

您可以使用 version 关键字参数在 Delta 表的不同版本之间进行时间旅行:

    > dt = DeltaTable("tmp/datafusion-table/", version=0)
    > dt.to_pandas()

      age
    0   68
    1   51
    2   47
    3   33
    4   23

有关更多信息,请参阅 delta-rs 文档中的 Datafusion 集成页面

无需 Spark 的 Delta Lake:结论

有许多方法可以在不使用 Spark 的情况下使用 Delta Lake。

专用的 Delta 连接器 允许您从 Flink、Hive、Trino、PrestoDB 等引擎使用 Delta Lake。

delta-rs 包允许您在 Rust 或 Python 中使用 Delta Lake,例如与 pandas、polars、Dask、Daft、DuckDB 等一起使用。

查看 示例笔记本,亲自运行 delta-rs 代码。

LinkedIn 上关注我们的作者