The Linux Foundation Projects
Delta Lake

Delta Lake 入门

本指南帮助您快速探索 Delta Lake 的主要功能。它提供了代码片段,展示了如何从交互式、批处理和流式查询中读取和写入 Delta 表。它还演示了表更新和时间旅行。

使用 Delta Lake 设置 Apache Spark

按照这些说明使用 Spark 设置 Delta Lake。您可以通过以下两种方式在本地计算机上运行本指南中的步骤

  1. 交互式运行:使用 Delta Lake 启动 Spark shell(Scala 或 Python),并在 shell 中交互式运行代码片段。
  2. 作为项目运行:使用 Delta Lake 设置 Maven 或 SBT 项目(Scala 或 Java),将代码片段复制到源文件,然后运行项目。或者,您可以使用 GitHub 存储库中提供的示例

设置交互式 shell

要在 Spark Scala 或 Python shell 中交互式使用 Delta Lake,您需要本地安装 Apache Spark。根据您是想使用 Python 还是 Scala,您可以分别设置 PySpark 或 Spark shell。对于以下所有说明,请确保安装与 Delta Lake 2.1.0 兼容的正确版本的 Spark 或 PySpark。有关详细信息,请参阅发行版兼容性矩阵。

PySpark shell

  1. 通过运行以下命令安装与 Delta Lake 版本兼容的 PySpark 版本

    pip install pyspark==<compatible-spark-version>
  2. 运行带有 Delta Lake 包和附加配置的 PySpark

    pyspark --packages io.delta:delta-core_2.12:2.1.0 \
      --conf "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension" \
      --conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog"

Spark Scala shell

按照下载 Spark 中的说明下载兼容版本的 Apache Spark,既可以使用 pip,也可以下载并解压缩存档,然后在解压缩的目录中运行 spark-shell。

bin/spark-shell --packages io.delta:delta-core_2.12:2.1.0 \
  --conf "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension" \
  --conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog"

设置项目

如果您想使用 Maven Central Repository 中的 Delta Lake 二进制文件构建项目,可以使用以下 Maven 坐标。

Maven

您可以通过在 POM 文件中将其添加为依赖项来将 Delta Lake 包含在 Maven 项目中。Delta Lake 使用 Scala 2.12 编译。

<dependency>
  <groupId>io.delta</groupId>
  <artifactId>delta-core_2.12</artifactId>
  <version>2.1.0</version>
</dependency>

SBT

您可以通过将以下行添加到 build.sbt 文件中来将 Delta Lake 包含在 SBT 项目中

libraryDependencies += "io.delta" %% "delta-core" % "2.1.0"

Python

要设置 Python 项目(例如,用于单元测试),您可以使用 pip install delta-spark 安装 Delta Lake,然后使用 Delta Lake 中的 configure_spark_with_delta_pip() 实用函数配置 SparkSession

from delta import *

builder = pyspark.sql.SparkSession.builder.appName("MyApp") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")

spark = configure_spark_with_delta_pip(builder).getOrCreate()

创建表

要创建 Delta 表,请以 delta 格式写入 DataFrame。您可以使用现有的 Spark SQL 代码,并将格式从 parquet、csv、json 等更改为 delta。

data = spark.range(0, 5)
data.write.format("delta").save("/tmp/delta-table")

这些操作使用从您的 DataFrame 推断出的 schema 创建一个新的 Delta 表。有关创建新 Delta 表时可用的完整选项集,请参阅创建表写入表文档文章。

本指南使用本地路径作为 Delta 表位置。有关 Delta 表的 HDFS 或云存储配置,请参阅存储配置文档文章。

读取数据

您可以通过指定文件路径 "/tmp/delta-table" 来读取 Delta 表中的数据

df = spark.read.format("delta").load("/tmp/delta-table")
df.show()

更新表数据

Delta Lake 支持使用标准 DataFrame API 修改表的多种操作。此示例运行一个批处理作业来覆盖表中的数据

覆盖

data = spark.range(5, 10)
data.write.format("delta").mode("overwrite").save("/tmp/delta-table")

如果您再次读取此表,您应该只会看到您添加的值 5-9,因为您覆盖了之前的数据。

不带覆盖的条件更新

Delta Lake 提供了程序化 API 来有条件地更新、删除和合并(upsert)数据到表中。这里有一些示例

from delta.tables import *
from pyspark.sql.functions import *

deltaTable = DeltaTable.forPath(spark, "/tmp/delta-table")

# Update every even value by adding 100 to it
deltaTable.update(
  condition = expr("id % 2 == 0"),
  set = { "id": expr("id + 100") })

# Delete every even value
deltaTable.delete(condition = expr("id % 2 == 0"))

# Upsert (merge) new data
newData = spark.range(0, 20)

deltaTable.alias("oldData") \
  .merge(
    newData.alias("newData"),
    "oldData.id = newData.id") \
  .whenMatchedUpdate(set = { "id": col("newData.id") }) \
  .whenNotMatchedInsert(values = { "id": col("newData.id") }) \
  .execute()

deltaTable.toDF().show()

您应该看到一些现有行已更新,并且已插入新行。

有关这些操作的更多信息,请参阅表删除、更新和合并文档文章。

使用时间旅行读取旧版本数据

您可以通过时间旅行查询 Delta 表的先前快照。如果您想访问您覆盖的数据,您可以使用 versionAsOf 选项查询您覆盖第一组数据之前的表快照。

df = spark.read.format("delta") \
  .option("versionAsOf", 0) \
  .load("/tmp/delta-table")

df.show()

您应该会看到第一组数据,即您覆盖它之前的数据。时间旅行利用 Delta Lake 事务日志的强大功能来访问表中不再存在的数据。删除 version 0 选项(或指定 version 1)将让您再次看到更新的数据。有关更多信息,请参阅查询表的旧快照(时间旅行)文档文章。

将数据流写入表

您还可以使用 Structured Streaming 写入 Delta 表。Delta Lake 事务日志保证了精确一次处理,即使有其他流或批处理查询同时针对该表运行。默认情况下,流以追加模式运行,这会将新记录添加到表中

streamingDf = spark.readStream.format("rate").load()

stream = streamingDf \
  .selectExpr("value as id") \
  .writeStream.format("delta") \
  .option("checkpointLocation", "/tmp/checkpoint") \
  .start("/tmp/delta-table")

在流运行时,您可以使用前面的命令读取表。

如果您在 shell 中运行此命令,您可能会看到流任务进度,这使得在该 shell 中键入命令变得困难。在新终端中启动另一个 shell 来查询表可能会很有用。

您可以通过在启动流的同一终端中运行 stream.stop() 来停止流。

有关 Delta Lake 与 Structured Streaming 集成的更多信息,请参阅表流式读写文档文章。

从表中读取更改流

当流写入 Delta 表时,您也可以将该表作为流式源读取。例如,您可以启动另一个流式查询,打印对 Delta 表所做的所有更改。您可以通过向 Structured Streaming 提供 startingVersionstartingTimestamp 选项来指定从哪个版本开始获取更改。有关详细信息,请参阅Structured Streaming文档文章。

stream2 = spark.readStream.format("delta") \
  .load("/tmp/delta-table") \
  .writeStream.format("console") \
  .start()

下一步

以下是有关 Delta Lake 的更多学习资源