使用 Spark Structured Streaming 将 Kafka 流写入 Delta Lake
作者:Bo Gao 、 Matthew Powers
这篇博文解释了如何使用 Spark Structured Streaming 将 Kafka 流转换为 Delta Lake 表。
数据通常只在 Kafka 中存储几个小时到几天,因此最好将这些数据持久化为长期存储格式。Delta Lake 提供了许多对流式用例有用的数据管理技术。Delta Lake 还允许使用各种引擎进行快速的下游查询。
让我们首先展示如何读取 Kafka 流并写入 Delta Lake 表
定期读取 Kafka 流并写入 Delta Lake 表
假设您有一个 Kafka 流,它以以下 JSON 格式输出数据
{"student_name":"someXXperson", "graduation_year":"2023", "major":"math"}
{"student_name":"liXXyao", "graduation_year":"2025", "major":"physics"}
student_name
字段包含由 XX
分隔的名字和姓氏。我们希望将此 Kafka 流摄取到 Delta 表中,并将 student_name
字段拆分为 first_name
和 last_name
字段。
以下是如何将 Kafka 流读取到 Spark DataFrame 中
df = (
spark.readStream.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", subscribeTopic)
.load()
)
我们需要手动定义 Kafka 流的模式才能摄取它
schema = StructType([
StructField("student_name", StringType()),
StructField("graduation_year", StringType()),
StructField("major", StringType()),
])
创建一个函数,它将处理 JSON 输入数据并返回一个格式正确的 Spark DataFrame,该 DataFrame 可以输出到 Delta 表中
def with_normalized_names(df, schema):
parsed_df = (
df.withColumn("json_data", from_json(col("value").cast("string"), schema))
.withColumn("student_name", col("json_data.student_name"))
.withColumn("graduation_year", col("json_data.graduation_year"))
.withColumn("major", col("json_data.major"))
.drop(col("json_data"))
.drop(col("value"))
)
split_col = split(parsed_df["student_name"], "XX")
return (
parsed_df.withColumn("first_name", split_col.getItem(0))
.withColumn("last_name", split_col.getItem(1))
.drop("student_name")
)
现在创建一个函数,每当运行它时,它将读取 Kafka 中的所有新数据。
def perform_available_now_update():
checkpointPath = "data/tmp_students_checkpoint/"
deltaPath = "data/tmp_students_delta"
return df.transform(lambda df: with_normalized_names(df)).writeStream.trigger(
availableNow=True
).format("delta").option("checkpointLocation", checkpointPath).start(deltaPath)
调用 perform_available_now_update()
函数并查看 Delta 表的内容。
perform_available_now_update()
spark.read.format("delta").load(deltaPath).show()
我们可以随时调用 perform_available_now_update()
函数,以使用 Kafka 流中的最新数据更新 Delta 表。您可以根据业务的延迟需求,每 10 分钟、每小时或每天运行此函数。
checkpointPath
跟踪已处理的 Kafka 数据,因此您无需手动跟踪已摄取到 Delta 表中的数据。Spark Structured Streaming 使这些增量更新变得非常容易。
连续读取 Kafka 流并写入 Delta Lake 表
在前面的示例中,我们展示了如何使用 availableNow
触发器读取 Kafka 流,该触发器允许定期处理。现在让我们看看如何使用 Spark Structured Streaming 连续处理 Kafka 流中的新数据。
这是一个函数,它每 10 秒读取 Kafka 中的最新数据并连续处理它
def perform_trigger_fixed_interval_update():
checkpointPath = "data/tmp_students_checkpoint/"
deltaPath = "data/tmp_students_delta"
return df.transform(lambda df: with_normalized_names(df)).writeStream.trigger(
processingTime='10 seconds'
).format("delta").option("checkpointLocation", checkpointPath).start(deltaPath)
对于连续流式传输,您需要一个始终运行的集群。这通常比定期预置新集群和增量处理新数据更昂贵。
连续流式传输适用于低延迟数据处理管道。当延迟要求较高时,增量处理效果很好。
对于连续流式查询,您还可以跳过手动设置触发器并依赖默认触发器。如果数据可以在预定义的处理时间内处理,这可能比处理时间触发器具有更低的延迟。
现在让我们关注将 Kafka 流摄取到 Delta 表中如何产生许多小文件。
为什么流式传输会导致小文件问题
当 Lakehouse 存储系统包含的文件大小适中时,查询引擎的性能最佳。小文件过多的 Delta 表存在小文件问题,这会导致查询运行缓慢。
从 Kafka 到 Delta 表的流式传输可能会导致大量小文件,尤其是当延迟较低和/或 Delta 表使用不利于流式传输的 Hive 样式分区键时。
如果您每 5 秒流式传输到 Delta 表,则很可能会创建大量小文件。按 ingestion_date
等键分区的 Delta 表不应创建过量的小文件。但是按高基数列(如 medical_code
)分区的 Delta 表也可能创建大量小文件。有关更多详细信息,请参阅这篇关于Hive 样式分区的优缺点的博文。
您应该设计 Kafka 数据摄取管道,以生成尽可能少的小文件。幸运的是,Delta Lake 有一个内置解决方案来解决小文件问题。
小文件压缩
Delta Lake 使压缩小文件和解决小文件问题变得容易。
每当 Delta 表累积大量小文件时,您都可以运行一行命令将小文件重新排列为适当大小的文件。
您还可以使用谓词仅压缩 Delta 表的一个子集,因此操作相对轻量。例如,您可以每天运行一个作业,压缩前一天摄取的所有文件。
请参阅这篇关于使用 OPTIMIZE 进行 Delta Lake 小文件压缩的博文以了解更多信息。
Delta Lake 还为 Kafka 工作负载提供了其他几个优势。
Delta Lake 对于 Kafka 流数据的优势
Delta Lake 对于 Kafka 工作流还有其他几个优势。
- 模式强制:Delta 表默认启用模式强制,因此如果 Kafka 流数据的模式发生变化,不匹配的数据不会损坏您的表
- 模式演进:您可以选择性地启用模式演进,以便您的表的模式可以随时间变化。
- 并发控制:Delta Lake 提供不同类型的并发控制,因此同时执行的命令不会损坏表。
- DML 操作:Delta Lake 使运行删除、更新和合并命令变得容易。
- 大型连接器生态系统:许多引擎可以读写 Delta 表。将 Kafka 流摄取到 Delta 表中,数据可以轻松地被 Spark、pandas、Polars、Trino 和许多其他查询引擎读取,请参阅此处查看完整的集成列表。
将 Kafka 流数据写入 Delta 表以进行更好的下游处理有很多很好的理由。
结论
这篇博文向您展示了使用 Spark Structured Streaming 读取 Kafka 流并将其写入 Delta 表是多么容易。您还可以使用其他项目将 Kafka 流写入 Delta 表,例如 kafka-delta-ingest。这种工作流程为从业者带来了很多好处,特别是当 Delta Lake 与传统数据湖进行比较时。有关更多详细信息,请参阅这篇关于为什么 Delta Lake 优于 Parquet 数据湖的博文。