实时物联网分析:从设备模拟到数据洞察
立即解锁
发布时间: 2025-08-14 01:32:48 阅读量: 11 订阅数: 19 


Azure数据工程实战指南:从入门到精通
### 实时物联网分析:从设备模拟到数据洞察
随着物联网技术的发展,实时物联网分析、高级分析和实时机器学习洞察成为众多组织追求业务发展和目标推进的重要领域。Apache Spark 的 Structured Streaming 为大数据工作负载的高级和流式分析提供了强大的支持。本文将详细介绍如何使用设备模拟器实现实时物联网分析,并将数据持久化到 Delta Lake 中。
#### 1. 准备工作
在开始之前,需要完成以下准备步骤:
1. 安装并运行 IoT 设备模拟器 Visual Studio 解决方案文件。该设备模拟器将生成随机设备数据,这些数据将被输入到 IoT 中心设备中,并由 Spark Structured Streaming 使用。
2. 创建并配置 IoT 中心设备。此服务将连接设备模拟器和 Spark Structured Streaming 服务。
3. 在 Azure 门户中创建 Databricks 服务。Databricks 的 Spark 计算集群将用于 Structured Streaming 过程。也可以使用 Synapse Analytics 进行此过程。
#### 2. 创建 IoT 中心
创建 IoT 中心并添加 IoT 设备后,在 IoT 中心的“内置终结点”部分添加新的使用者组。使用者组用于应用程序从 IoT 中心提取数据,因此在编写 Structured Streaming 代码时,使用可识别的别名会很有用。
#### 3. 创建 Databricks 集群
为了进行本次练习,需要创建一个标准集群。配置完成后,还需要安装 Maven 库。具体步骤如下:
1. 从以下 URL 找到所需的 Maven 库坐标:https://mvnrepository.com/artifact/com.microsoft.azure/azure-eventhubs-spark。
2. 在集群库配置 UI 中的 Maven 库中输入坐标 `com.microsoft.azure:azure-eventhubs-spark_2.12:2.3.15`。
3. 安装选定的 Maven 库后,集群将显示“已安装”状态。重启集群以使库正确安装。
#### 4. 创建笔记本并运行 Structured Streaming 查询
接下来,将学习如何在 Databricks 笔记本中实现 Scala 代码,连接到 IoT 中心并启动结构化流。以下是具体步骤:
1. **配置笔记本连接**:
- 创建新的 Databricks 笔记本,并附加安装了 Maven 库的标准集群。使用 Scala 作为代码语言。
- 使用以下代码构建连接字符串并启动结构化流:
```scala
import org.apache.spark.eventhubs._
import org.apache.spark.eventhubs.{ ConnectionStringBuilder, EventHubsConf,
EventPosition }
import org.apache.spark.sql.functions.{ explode, split }
// To connect to an Event Hub, EntityPath is required as part of the
connection string.
// Here, we assume that the connection string from the Azure portal does
not have the EntityPath part.
val connectionString = ConnectionStringBuilder("—Event Hub Compatible
Endpoint--")
.setEventHubName("—Event Hub Compatible Name--")
.build
val eventHubsConf = EventHubsConf(connectionString)
.setStartingPosition(EventPosition.fromEndOfStream)
.setConsumerGroup("delta")
val eventhubs = spark.readStream
.format("eventhubs")
.options(eventHubsConf.toMap)
.load()
```
- 从 Azure 门户的 IoT 中心的“内置终结点”部分复制“与事件中心兼容的名称”和“与事件中心兼容的终结点”,并替换代码中的占位符。
2. **启动结构化流**:
- 添加以下代码读取流并将数据持久化到 delta 表:
```scala
display(eventhubs)
```
-
0
0
复制全文
相关推荐










