Delta Lake 入门
本指南帮助您快速探索 Delta Lake 的主要功能。它提供了代码片段,展示了如何从交互式、批处理和流式查询中读取和写入 Delta 表。它还演示了表更新和时间旅行。
使用 Delta Lake 设置 Apache Spark
按照这些说明使用 Spark 设置 Delta Lake。您可以通过以下两种方式在本地计算机上运行本指南中的步骤
- 交互式运行:使用 Delta Lake 启动 Spark shell(Scala 或 Python),并在 shell 中交互式运行代码片段。
- 作为项目运行:使用 Delta Lake 设置 Maven 或 SBT 项目(Scala 或 Java),将代码片段复制到源文件,然后运行项目。或者,您可以使用 GitHub 存储库中提供的示例。
设置交互式 shell
要在 Spark Scala 或 Python shell 中交互式使用 Delta Lake,您需要本地安装 Apache Spark。根据您是想使用 Python 还是 Scala,您可以分别设置 PySpark 或 Spark shell。对于以下所有说明,请确保安装与 Delta Lake 2.1.0 兼容的正确版本的 Spark 或 PySpark。有关详细信息,请参阅发行版兼容性矩阵。
PySpark shell
-
通过运行以下命令安装与 Delta Lake 版本兼容的 PySpark 版本
pip install pyspark==<compatible-spark-version>
-
运行带有 Delta Lake 包和附加配置的 PySpark
pyspark --packages io.delta:delta-core_2.12:2.1.0 \ --conf "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension" \ --conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog"
Spark Scala shell
按照下载 Spark 中的说明下载兼容版本的 Apache Spark,既可以使用 pip
,也可以下载并解压缩存档,然后在解压缩的目录中运行 spark-shell。
bin/spark-shell --packages io.delta:delta-core_2.12:2.1.0 \
--conf "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension" \
--conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog"
设置项目
如果您想使用 Maven Central Repository 中的 Delta Lake 二进制文件构建项目,可以使用以下 Maven 坐标。
Maven
您可以通过在 POM 文件中将其添加为依赖项来将 Delta Lake 包含在 Maven 项目中。Delta Lake 使用 Scala 2.12 编译。
<dependency>
<groupId>io.delta</groupId>
<artifactId>delta-core_2.12</artifactId>
<version>2.1.0</version>
</dependency>
SBT
您可以通过将以下行添加到 build.sbt
文件中来将 Delta Lake 包含在 SBT 项目中
libraryDependencies += "io.delta" %% "delta-core" % "2.1.0"
Python
要设置 Python 项目(例如,用于单元测试),您可以使用 pip install delta-spark
安装 Delta Lake,然后使用 Delta Lake 中的 configure_spark_with_delta_pip()
实用函数配置 SparkSession
from delta import *
builder = pyspark.sql.SparkSession.builder.appName("MyApp") \
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
spark = configure_spark_with_delta_pip(builder).getOrCreate()
创建表
要创建 Delta 表,请以 delta 格式写入 DataFrame。您可以使用现有的 Spark SQL 代码,并将格式从 parquet、csv、json 等更改为 delta。
data = spark.range(0, 5)
data.write.format("delta").save("/tmp/delta-table")
这些操作使用从您的 DataFrame 推断出的 schema 创建一个新的 Delta 表。有关创建新 Delta 表时可用的完整选项集,请参阅创建表和写入表文档文章。
本指南使用本地路径作为 Delta 表位置。有关 Delta 表的 HDFS 或云存储配置,请参阅存储配置文档文章。
读取数据
您可以通过指定文件路径 "/tmp/delta-table"
来读取 Delta 表中的数据
df = spark.read.format("delta").load("/tmp/delta-table")
df.show()
更新表数据
Delta Lake 支持使用标准 DataFrame API 修改表的多种操作。此示例运行一个批处理作业来覆盖表中的数据
覆盖
data = spark.range(5, 10)
data.write.format("delta").mode("overwrite").save("/tmp/delta-table")
如果您再次读取此表,您应该只会看到您添加的值 5-9,因为您覆盖了之前的数据。
不带覆盖的条件更新
Delta Lake 提供了程序化 API 来有条件地更新、删除和合并(upsert)数据到表中。这里有一些示例
from delta.tables import *
from pyspark.sql.functions import *
deltaTable = DeltaTable.forPath(spark, "/tmp/delta-table")
# Update every even value by adding 100 to it
deltaTable.update(
condition = expr("id % 2 == 0"),
set = { "id": expr("id + 100") })
# Delete every even value
deltaTable.delete(condition = expr("id % 2 == 0"))
# Upsert (merge) new data
newData = spark.range(0, 20)
deltaTable.alias("oldData") \
.merge(
newData.alias("newData"),
"oldData.id = newData.id") \
.whenMatchedUpdate(set = { "id": col("newData.id") }) \
.whenNotMatchedInsert(values = { "id": col("newData.id") }) \
.execute()
deltaTable.toDF().show()
您应该看到一些现有行已更新,并且已插入新行。
有关这些操作的更多信息,请参阅表删除、更新和合并文档文章。
使用时间旅行读取旧版本数据
您可以通过时间旅行查询 Delta 表的先前快照。如果您想访问您覆盖的数据,您可以使用 versionAsOf 选项查询您覆盖第一组数据之前的表快照。
df = spark.read.format("delta") \
.option("versionAsOf", 0) \
.load("/tmp/delta-table")
df.show()
您应该会看到第一组数据,即您覆盖它之前的数据。时间旅行利用 Delta Lake 事务日志的强大功能来访问表中不再存在的数据。删除 version 0 选项(或指定 version 1)将让您再次看到更新的数据。有关更多信息,请参阅查询表的旧快照(时间旅行)文档文章。
将数据流写入表
您还可以使用 Structured Streaming 写入 Delta 表。Delta Lake 事务日志保证了精确一次处理,即使有其他流或批处理查询同时针对该表运行。默认情况下,流以追加模式运行,这会将新记录添加到表中
streamingDf = spark.readStream.format("rate").load()
stream = streamingDf \
.selectExpr("value as id") \
.writeStream.format("delta") \
.option("checkpointLocation", "/tmp/checkpoint") \
.start("/tmp/delta-table")
在流运行时,您可以使用前面的命令读取表。
如果您在 shell 中运行此命令,您可能会看到流任务进度,这使得在该 shell 中键入命令变得困难。在新终端中启动另一个 shell 来查询表可能会很有用。
您可以通过在启动流的同一终端中运行 stream.stop()
来停止流。
有关 Delta Lake 与 Structured Streaming 集成的更多信息,请参阅表流式读写文档文章。
从表中读取更改流
当流写入 Delta 表时,您也可以将该表作为流式源读取。例如,您可以启动另一个流式查询,打印对 Delta 表所做的所有更改。您可以通过向 Structured Streaming 提供 startingVersion
或 startingTimestamp
选项来指定从哪个版本开始获取更改。有关详细信息,请参阅Structured Streaming文档文章。
stream2 = spark.readStream.format("delta") \
.load("/tmp/delta-table") \
.writeStream.format("console") \
.start()
下一步
以下是有关 Delta Lake 的更多学习资源