利用AzureDataFactory玩转DeltaLake与实时异常检测
立即解锁
发布时间: 2025-08-14 01:32:47 阅读量: 11 订阅数: 18 


Azure数据工程实战指南:从入门到精通
### 利用Azure Data Factory玩转Delta Lake与实时异常检测
#### 1. Delta Lake基础与Spark相关问题
在数据处理领域,Spark虽然有内置的数据帧写入器API,但在处理追加操作时虽有一定表现,但并非原子操作,且在使用云存储时会带来性能开销。当前Apache Spark的保存模式有ErrorIfExists、Append、Overwrite和Ignore。同时,在数据处理中还存在ACID相关的问题:
- **一致性(Consistency)**:数据需始终处于有效状态。若Spark API写入器删除旧文件并创建新文件,且操作非事务性,那么在旧文件删除与新文件创建之间会有一段时间文件不存在。若覆盖操作失败,会导致旧文件数据丢失,新文件也可能无法创建,这是典型的Spark覆盖操作一致性问题。
- **隔离性(Isolation)**:多个事务应独立发生且互不干扰。即向数据集写入数据时,对同一数据集的其他并发读写操作不应受此写入操作影响。典型的事务性数据库提供多种隔离级别,如读未提交、读已提交、可重复读、快照和可串行化。但Spark由于缺乏原子性,没有隔离类型。
- **持久性(Durability)**:已提交的数据不应丢失。若Spark未正确实现提交操作,会破坏云存储提供的良好持久性特性,导致数据损坏或丢失,违反数据持久性原则。
#### 2. 开展Delta Lake操作的先决条件
要开始Delta Lake的操作,需完成以下先决条件的创建:
1. **创建Data Factory V2**:Data Factory用于执行ELT编排,其Mapping Data Flows Delta Lake连接器用于创建和管理Delta Lake。
2. **创建Data Lake Storage Gen2**:作为数据湖存储,Delta Lake将在此基础上创建。
3. **创建Data Lake Storage Gen2容器和区域**:创建Data Lake Storage Gen2账户后,创建相应的容器和区域。本练习使用Raw区域存储示例源Parquet文件,Staging区域用于Delta更新、插入、删除和其他转换。Curated区域虽在本练习中未使用,但可能包含最终的ETL、高级分析或数据科学模型。
4. **将数据上传到Raw区域**:可通过在线搜索“sample parquet files”或在公开可用数据集中获取免费的示例Parquet文件,也可从GitHub仓库(https://github.com/Teradata/kylo/tree/master/samples/sample-data/parquet)下载示例文件并上传到ADLS Gen2存储账户。
5. **创建指向Raw区域的Data Factory Parquet数据集**:在新创建的ADF V2实例中创建Parquet格式数据集,指向Raw区域存储的示例Parquet文件。
以下是创建先决条件的流程示意:
```mermaid
graph LR
A[创建Data Factory V2] --> B[创建Data Lake Storage Gen2]
B --> C[创建Data Lake Storage Gen2容器和区域]
C --> D[上传数据到Raw区域]
D --> E[创建指向Raw区域的Data Factory Parquet数据集]
```
#### 3. 创建并向Delta Lake插入数据
完成上述先决条件后,即可创建初始Delta表并将Raw区域的数据插入其中,具体步骤如下:
1. **创建新的Data Factory管道并添加Mapping Data Flow**:命名管道和数据流,添加源并配置设置,勾选“Allow schema drift”选项。当字段、列和类型的元数据频繁变化时,会出现模式漂移。若缺乏处理模式漂移的适当流程,ETL管道可能失败。ADF支持经常变化的灵活模式,将模式漂移视为后期绑定,漂移的模式在数据流中不可见。启用模式漂移后,执行时会读取源中的所有传入字段并传递到整个流程的接收器。新检测到的列默认以字符串数据类型到达,若需自动推断漂移列的数据类型,需在源设置中启用“Infer drifted column types”,同时可使用采样方法限制源中的行数,主要用于测试和调试。
2. **配置分区**:由于Delta Lake利用Spark的分布式处理能力可适当分区,为探索手动设置分区的能力,可在ID列上配置20个哈希分区。
3. **设置接收器**:确保接收器类型设置为Azure Data Factory中的Delta格式(https://docs.microsoft.com/en - us/azure/data - factory/format - delta),选择链接服务。在“Settings”选项卡中,选择Staging文件夹,选择“Allow insert”作为更新方法,若需在加载前截断Delta表,可选择“Truncate table”选项。还可进行“vacuum”操作,该操作会删除Delta表不再引用且超过保留阈值(小时)的文件,若值为0或为空,默认保留30天。在“Optimize”选项卡中,使用当前分区,因为源分区会向下游传递到接收器。
当触发管道并运行完成后,会发现创建了13个新列,分布在20个不同分区。查看ADLS Gen2 Staging文件夹,会看到创建了一个delta_log文件夹和20个Snappy压缩的Parquet文件。打开delta_log文件夹可查看两个事务日志文件,这些日志记录了许多重要特性,如ACID事务、可扩展元数据处理、时间旅行等。通过切换ADF中的Data Preview选项卡,可轻松查看Delta Lake中的新插入数据。
#### 4. 更新Delta Lake
完成插入操作后,接下来看看如何使用Data Factory更新Delta表。操作与插入类似,创建一个新的ADF管道并添加用于更新的Mapping Data Flow。具体操作如下:
1. **添加转换活动**:添加Derived column和AlterRow转换活动,将用户的名字和姓氏转换为小写。Derived column转换活动使用表达式lower(first_name)和lower(last_name)实现转换,Mapping Data Flows能处理此阶段的复杂转换。
2. **配置源选项**:源数据仍为之前配置插入操作时的Staging Delta Lake。ADF中的Delta Lake源连接器引入了Delta时间旅行功能,可用于大规模数据湖的数据更改审计、结果重现、回滚处理等,允许按时间戳或版本查询数据。
3. **设置AlterRow条件**:指定“Update if”条件为t
0
0
复制全文
相关推荐









