深入解析SparkStructuredStreaming:输出模式与触发器
立即解锁
发布时间: 2025-08-24 00:46:40 阅读量: 1 订阅数: 3 

# Spark Streaming:输出模式与触发机制深度解析
## 1. 内存表操作与输出模式概述
在进行流处理时,我们可能会涉及对内存表的操作。例如,统计“rates”内存表中的行数:
```scala
spark.sql("select count(*) from rates").show
```
输出结果如下:
```
+---------+
| count(1)|
+---------+
| 100|
+---------+
```
若要停止“ratesSQ”查询流,可使用以下代码:
```scala
ratesSQ.stop
```
需要注意的是,即使流查询“ratesSQ”停止,内存中的“rates”数据仍然存在。但当启动一个同名的新流查询时,内存中的数据将被截断。
在深入了解输出模式之前,我们需要先了解不同数据接收器支持的输出模式,如下表所示:
| 数据接收器 | 支持的输出模式 | 备注 |
| --- | --- | --- |
| File | Append | 仅支持写入新行,不支持更新 |
| Kafka | Append, Update, Complete | - |
| Foreach | Append, Update, Complete | 取决于 `ForeachWriter` 的实现 |
| Console | Append, Update, Complete | - |
| Memory | Append, Complete | 不支持原地更新 |
## 2. 输出模式深度解析
### 2.1 无状态流查询
无状态流查询主要对传入的流数据进行基本转换,然后将数据写入数据接收器。典型的用例是实时流 ETL,例如持续读取在线服务产生的页面视图事件,以捕获哪些用户正在查看哪些页面。通常会执行以下操作:
- **过滤、转换和清理**:现实世界的数据往往杂乱无章,结构可能不适合重复分析。
- **转换为更高效的存储格式**:如将文本文件格式(如 CSV 和 JSON)转换为更高效的二进制格式(如 ORC、Parquet 或 Avro),以减少文件大小并提高分析速度。
- **按特定列分区数据**:在将数据写入数据接收器时,可根据常用列的值对数据进行分区,以加快组织内各团队的重复分析。
由于这些任务在将数据写入数据接收器之前不需要维护任何状态,因此无状态流查询唯一适用的输出模式是“Append”。“Complete”输出模式不适用,因为它要求 Structured Streaming 维护所有先前的数据,这可能会导致数据量过大而难以维护。“Update”输出模式也不适用,因为只写入新数据。不过,当将“Update”模式用于无状态流查询时,Structured Streaming 会将其视为“Append”模式。
以下是使用不适当的“Complete”输出模式进行无状态流查询的示例:
```scala
val ratesDF = spark.readStream.format("rate")
.option("rowsPerSecond","10")
.option("numPartitions","2")
.load()
// 简单转换
val ratesOddEvenDF = ratesDF.withColumn("even_odd", $"value" % 2 === 0)
// 使用 complete 输出模式写入 Console 数据接收器
val ratesSQ = ratesOddEvenDF.writeStream.outputMode("complete")
.format("console")
.option("truncate",false)
.option("numRows",50)
.start()
```
运行上述代码时,会抛出异常:
```
org.apache.spark.sql.AnalysisException: Complete output mode not supported
when there are no streaming aggregations on streaming DataFrames/Datasets;
```
### 2.2 有状态流查询
有状态流查询在进行聚合操作(如通过 `groupBy` 转换)时,聚合状态由 Structured Streaming 引擎隐式维护。随着更多数据的到来,新数据的聚合结果会更新到结果表中。在每个触发点,根据输出模式,将更新后的数据或结果表中的所有数据写入数据接收器。
对于有状态查询,使用“Append”输出模式是不合适的,因为它违反了该输出模式的语义,即只将追加到结果表的新行发送到指定的输出接收器。因此,只有“Complete”和“Update”输出模式适用于由 Structured Streaming 引擎隐式维护聚合状态的有状态查询类型。使用“Complete”输出模式的流查询输出总是等于或多于使用“Update”输出模式的相同流查询输出。
以下是展示“Update”和“Complete”模式输出差异的示例代码:
```scala
// 导入语句
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions._
val mobileDataSchema = new StructType().add("id", StringType, false)
.add("action", StringType, false)
.add("ts", TimestampType, false)
val mobileDF = spark.readStream.schema(mobileDataSchema)
.json("<path>/chapter6/data/input")
val actionCountDF = mobileDF.groupBy($"action").count
val completeModeSQ = actionCountDF.writeStream.format("console")
.option("truncate", "false")
.outputMode("complete").start()
val updateModeSQ = actionCountDF.writeStream.format("console")
.option("truncate", "false")
.outputMode("complete").start()
```
运行上述代码后,将文件 `file1.json`、`file2.json`、`file3.json` 和 `newaction.json` 从 `mobile` 目录复制到 `input` 目录。“Complete”模式的流查询输出如下:
```
-------------------------------------------
Batch: 3
-------------------------------------------
+-------+------+
| action| count|
+-------+------+
| close| 3|
| swipe| 1|
| crash| 1|
| open| 5|
+-------+------+
```
“Update”模式的流查询输出如下:
```
-------------------------------------------
Batch: 3
-------------------------------------------
+-------+
```
0
0
复制全文
相关推荐








