如何创建 Delta Lake 表
有多种简单的方法可以创建 Delta Lake 表。本文将解释如何使用 SQL、PySpark 和其他技术来完成此操作。它还将向您展示如何从存储在 CSV 和 Parquet 文件中的数据创建 Delta Lake 表。
Delta Lake 是开源的,并将数据存储在开放的 Apache Parquet 文件格式中。其开放性使其成为适用于各种用例的灵活文件协议。Delta Lake 与广泛的其他技术(例如 Apache Flink、Apache Hive、Apache Kafka、PrestoDB、Apache Spark 和 Trino 等)无缝协作,并提供 Rust、Python、Scala、Java 等语言 API,使组织能够轻松地将该框架集成到其现有的 ETL 管道中。
创建 Delta Lake 表的最佳方式取决于您的设置和技术偏好。本文将向您展示几种不同的选项,以便您可以根据自己的情况选择最佳选项。
让我们深入研究一些代码片段,看看如何创建 Delta Lake 表。有关本文中的所有代码,请参阅此 Jupyter Notebook。
从 DataFrame 创建 Delta Lake 表
您可以将 PySpark DataFrame 写入 Delta Lake,从而创建 Delta Lake 表。
为了演示,我们首先创建一个包含几行数据的 PySpark DataFrame
columns = ["character", "franchise"]
data = [("link", "zelda"), ("king k rool", "donkey kong"), ("samus", "metroid")]
rdd = spark.sparkContext.parallelize(data)
df = rdd.toDF(columns)
df.show()
+-----------+-----------+
| character| franchise|
+-----------+-----------+
| link| zelda|
|king k rool|donkey kong|
| samus| metroid|
+-----------+-----------+
以下是如何将此 DataFrame 写入 Parquet 文件并创建表(您可能熟悉此操作)
df.write.format("parquet").saveAsTable("table1_as_parquet")
创建 Delta Lake 表使用几乎相同的语法——只需将格式从“parquet”切换到“delta”即可,非常简单
df.write.format("delta").saveAsTable("table1")
我们可以运行命令来确认该表确实是 Delta Lake 表
DeltaTable.isDeltaTable(spark, "spark-warehouse/table1") # True
我们可以通过 PySpark API 获取此表的内容
spark.table("table1").show()
+-----------+-----------+
| character| franchise|
+-----------+-----------+
|king k rool|donkey kong|
| samus| metroid|
| link| zelda|
+-----------+-----------+
Delta Lake 比其他表格文件格式(如 CSV 和 Parquet)具有许多优势:它支持 ACID 事务、时间旅行、版本化数据等。除非您有充分的理由使用其他文件格式,否则您通常会希望使用 Delta Lake。
从 PySpark DataFrame 创建 Delta Lake 表非常简单。使用编程 DeltaTable API 创建 Delta Lake 表也很简单。
使用 PySpark API 创建 Delta Lake 表
以下是使用 PySpark API 创建 Delta Lake 表的方法
from pyspark.sql.types import *
dt1 = (
DeltaTable.create(spark)
.tableName("testTable1")
.addColumn("c1", dataType="INT", nullable=False)
.addColumn("c2", dataType=IntegerType(), generatedAlwaysAs="c1 + 1")
.partitionedBy("c1")
.execute()
)
这将创建一个包含 c1
和 c2
列的空 Delta Lake 表。
如果表已存在,则 create 方法将出错。为避免这种情况,您可以改用 createIfNotExists
方法。
Delta Lake 的流式 API 提供了一种优雅的方式来使用 PySpark 代码创建表。该 API 还允许您指定生成列和属性。
使用 SQL 创建 Delta Lake 表
您可以使用纯 SQL 命令创建 Delta Lake 表,类似于在关系数据库中创建表
spark.sql("""
CREATE TABLE table2 (country STRING, continent STRING) USING delta
""")
让我们向新创建的 Delta Lake 表添加一些数据
spark.sql("""
INSERT INTO table2 VALUES
('china', 'asia'),
('argentina', 'south america')
""")
然后将其打印出来以验证数据是否已正确添加
spark.sql("SELECT * FROM table2").show()
+---------+-------------+
| country| continent|
+---------+-------------+
|argentina|south america|
| china| asia|
+---------+-------------+
我们可以使用以下命令确认该表是 Delta Lake 表
spark.sql("DESCRIBE DETAIL table2").select("format").show()
+------+
|format|
+------+
| delta|
+------+
通过 SQL 手动创建 Delta Lake 表很简单,一旦创建了表,您就可以像往常一样对其执行其他数据操作。
从 CSV 创建 Delta Lake 表
假设您有以下 students1.csv
文件
student_name,graduation_year,major
someXXperson,2023,math
liXXyao,2025,physics
您可以使用这些命令将此 CSV 文件读入 Spark DataFrame 并将其写入 Delta Lake 表
df = spark.read.option("header", True).csv("students1.csv")
df.write.format("delta").saveAsTable("students")
对于单个 CSV 文件,您甚至不需要使用 Spark:您可以简单地使用不依赖 Spark 的 delta-rs,并从 Pandas DataFrame 创建 Delta Lake。如果您有多个 CSV 文件,使用 PySpark 通常更好,因为它可以并行读取多个文件。
以下是如何使用多个 CSV 文件创建 Delta Lake 表
df = spark.read.option("header", True).csv("path/with/csvs/")
df.write.format("delta").save("some/other/path")
从 Parquet 创建 Delta Lake 表
您可以遵循类似的设计模式将 Parquet 文件转换为 Delta Lake,将它们读入 Spark DataFrame,然后将它们写入 Delta Lake——但还有一种更简单的方法。
Delta Lake 将数据存储在 Parquet 文件中,将元数据存储在事务日志中。从 Parquet 文件创建 Delta Lake 时,您不需要重写数据:您可以执行就地操作,只需将事务日志添加到现有 Parquet 文件文件夹中。以下是执行此操作的方法
DeltaTable.convertToDelta(spark, "parquet.`tmp/lake1`")
假设您在 tmp/lake1 中存储了以下 Parquet 文件
tmp/lake1
├── _SUCCESS
├── part-00000-1f1cc136-76ea-4185-84d6-54f7e758bfb7-c000.snappy.parquet
├── part-00003-1f1cc136-76ea-4185-84d6-54f7e758bfb7-c000.snappy.parquet
├── part-00006-1f1cc136-76ea-4185-84d6-54f7e758bfb7-c000.snappy.parquet
└── part-00009-1f1cc136-76ea-4185-84d6-54f7e758bfb7-c000.snappy.parquet
以下是文件转换为 Delta Lake 后的样子
tmp/lake1
├── _SUCCESS
├── _delta_log
│ ├── 00000000000000000000.checkpoint.parquet
│ ├── 00000000000000000000.json
│ └── _last_checkpoint
├── part-00000-1f1cc136-76ea-4185-84d6-54f7e758bfb7-c000.snappy.parquet
├── part-00003-1f1cc136-76ea-4185-84d6-54f7e758bfb7-c000.snappy.parquet
├── part-00006-1f1cc136-76ea-4185-84d6-54f7e758bfb7-c000.snappy.parquet
└── part-00009-1f1cc136-76ea-4185-84d6-54f7e758bfb7-c000.snappy.parquet
有关更多信息,请参阅博客文章“从 Parquet 转换为 Delta Lake”。
从其他技术创建 Delta Lake 表
Delta Lake 的开放性允许建立强大的连接器生态系统。这意味着您可以使用各种其他技术创建 Delta Lake。以下是一些示例
- delta-rs Python 绑定允许您从 pandas DataFrame 创建 Delta Lake。
- kafka-delta-ingest 是一种将数据从 Kafka 流式传输到 Delta Lake 的高效方法。
- 连接器仓库包含 Delta Standalone,这是一个不依赖于 Spark 的 Java 库,它允许基于 Java 的连接器,如 Hive 和 Flink。
Delta Lake 社区不断发展连接器生态系统,许多开发人员为其内部项目构建连接器并慷慨捐赠。Delta 生态系统是一个友好且富有成效的贡献场所。以下是 Delta Lake 新贡献者在他们的第一个拉取请求合并后的反馈
结论
本文向您展示了创建 Delta Lake 表的多种方法:从 DataFrame、从 CSV 或 Parquet 文件、使用 SQL 或通过 Delta Lake 生态系统中的各种其他连接器。
由于该框架是开源的,因此可以使用任何技术创建 Delta Lake;它只需要遵循 Delta Lake 协议。
开放格式不会受到供应商锁定的困扰,这也是数据专业人员越来越多地转向 Delta Lake 等开放协议的部分原因。数据社区喜欢 Delta Lake 的强大功能、开放性和庞大的生态系统。
Delta Lake 社区欢迎新成员。请随时加入我们的 Slack 频道——社区将很乐意帮助您加入!您还可以在 Twitter 或 LinkedIn 上关注我们。