数据工程师的数据编程与查询
立即解锁
发布时间: 2025-08-21 01:26:34 阅读量: 2 订阅数: 19 


Azure数据工程与处理实战指南
### 数据工程师的数据编程与查询
#### 1. 数据处理基础
在数据处理中,若不使用`explode()`方法,直接选择AF3电极数组的全部内容,会得到两列数据。例如:
```python
dfe = df.select('Session.Scenario', 'Session.POWReading.AF3')
```
第一列是场景,第二列包含`brainjammer.json`文件中的所有AF3读数。一个会话大约持续两分钟,该文件中的读数接近1200个,这只是单个电极的数据。
`explode()`方法能以更直观、易读的方式格式化数据,也有助于进行进一步的查询,如查找特定电极的平均值、最大值或最小值。创建Azure Synapse Analytics工作区、配置Spark池并创建笔记本后,就可以执行相关代码,深入了解数据。
#### 2. 数据工程师的编程要求
作为Azure数据工程师,需要编写一些代码。虽然可能不需要深入理解封装、异步模式或并行LINQ查询,但一定的编码技能是必要的。目前主要接触到的是SQL语法和针对Python编程语言的PySpark,后续可能会看到一些C#代码,但示例中主要使用SQL和PySpark。
#### 3. PySpark与Spark
在加载DataFrame或操作数据时,会使用到`%%pyspark`和`spark`等魔法命令。PySpark和Spark的对比如下:
| 对比项 | PySpark | Spark |
| ---- | ---- | ---- |
| 定义 | 允许Python与Spark协作的API | 处理大数据的计算平台 |
| 知识要求 | 需要Python、大数据和Spark知识 | 需要Scala和数据库知识 |
| 实现方式 | 使用用Python编写的Py4j库 | 用Scala编写 |
更多详细信息可访问以下网站:
- PySpark: https://spark.apache.org/docs/latest/api/python/index.html
- Spark: https://spark.apache.org/docs/latest/index.html
- Scala: https://scala-lang.org
#### 4. 数据来源与文件操作
可以从多个位置检索数据,下面介绍如何使用PySpark读取和写入JSON、CSV和parquet文件。
##### 4.1 JSON文件操作
```python
# 读取单个JSON文件
df = spark.read.json('path/brainwaves.json')
# 使用通配符读取多个JSON文件
df = spark.read.json('path/*.json')
# 读取不同位置的多个JSON文件
df = spark.read.json(['AF3/THETA.json', 'T8/ALPHA.json'])
# 使用format和load方法读取JSON文件
df = spark.read.format('json').load('path/')
# 处理多行JSON文件
df = spark.read.option('multiline', 'true').json('pathToJsonFile')
# 指定schema读取JSON文件
df = spark.read.schema('schemaStructure').json('path/brainwaves.json')
# 写入JSON文件
df.write.json('path/brainwave.json')
# 以覆盖模式写入JSON文件
dr.write.mode('overwrite').json('path/brainwave.json')
```
当JSON文件为多行格式时,需要使用`option()`方法将`multiline`选项设置为`true`。若不提供schema,运行时会尝试自行推断,但对于CSV文件,所有内容都会作为字符串类型返回。
定义schema的示例如下:
```python
from pyspark.sql.types import StructType, StructField, StringType, DateTimeType, IntegerType, DecimalType
schema = StructType([
StructField('Scenario', StringType(), True),
StructField('ReadindDate', DateTimeType(), True),
StructField('Counter', IntegerType(), False),
StructField('THETA', DecimalType(), True)
])
```
写入文件时,`mode`参数有不同选项:
- `overwrite`:覆盖同名文件。
- `append`:将新数据添加到同名现有文件末尾。
- `ignore`:若同名文件已存在,则不执行写入操作。
- `errorifexists`:若同名文件已存在,抛出错误。
##### 4.2 CSV文件操作
```python
# 读取单个CSV文件
df = spark.read.csv('path/brainwaves.csv')
# 读取目录下的所有CSV文件
df = spark.read.csv('path/')
# 读取不同位置的多个CSV文件
df = spark.read.csv('AF3/THETA.csv, 'T8/ALPHA.csv)
# 使用format和load方法读取CSV文件
df = spark.read.format('csv').load('path/')
# 读取带表头的CSV文件
df = spark.read.option('header', True).csv('pathToCsvFile')
# 指定schema和表头读取CSV文件
df = spark.read.format('csv') \
.option('header', True) \
.schema('schemaStructure').load('path/brainwaves.csv')
# 写入带表头的CSV文件
df.write.option('header', True).csv('path/brainwave.csv')
# 以错误模式写入CSV文件
dr.write.format(csv).mode('error').save('path/brainwave.csv')
```
处理CSV文件时,有更多选项,如`delimiter`(分隔符)、`inferSchema`(推断数据类型)、`header`(表头)、`quotes`(引号处理)、`nullValues`(空值处理)和`dateFormat`(日期格式)。
##### 4.3 parquet文件操作
```python
# 读取parquet文件
df = spark.read.parquet('path/brainwaves.parquet')
# 使用format和load方法读取parquet文件
df = spark.read.format('parquet').load('path/')
# 写入parquet文件
df.write.parquet('path/brainwave.parquet')
# 以追加模式写入parquet文件
dr.write.mode('append').parquet('path/brainwave.parquet')
```
parquet文件会保留处理时使用的schema,因此读取时使用schema的重要性相对较低。
#### 5. DataFrame操作
可以通过`spark.read.*`方法创建DataFrame,也可以将数据加载到对象中,再使用`createDataFrame()`方法加载到DataFrame。
```python
# 通过路径读取数据到DataFrame
df = spark.read.csv('/tmp/output/brainjammer/reading.csv')
# 将数据加载到对象
data ='abfss://<uid>@<accountName>.dfs.core.windows.net/reading.csv'
# 使用createDataFrame方法加载数据到DataFrame
df = spark.createDataFrame(data, schema)
```
DataFrame加载数据后,有许多可用的操作,以下是一些重要的函数:
##### 5.1 SHOW()
```python
df.show(5, truncate=False, vertical=True)
```
参数说明:
- `n`:指定返回的行数,默认最多返回20行。
- `truncate`:默认值为`True`,只显示列的前20个字符;设置为`False`时,显示整个列值。
- `vertical`:默认值为`False`,设置为`True`时,行和列从上到下依次列出,而不是常见的从左到右的行列对齐方式。
##### 5.2 JOIN()
```p
```
0
0
复制全文
相关推荐










