数据清洗与处理实战指南
立即解锁
发布时间: 2025-09-03 01:44:00 阅读量: 2 订阅数: 13 AIGC 

### 数据清洗与处理实战指南
在数据处理过程中,我们常常会遇到数据格式不一致、需要聚合结果以及提高处理效率等问题。本文将详细介绍如何解决这些问题,包括日期格式标准化、结果聚合以及并行处理数据。
#### 1. 日期格式标准化
日志中的日期时间格式因位置而异,加拿大日志采用标准的 ISO 8601 格式(YYYY-MM-DD),而美国日志使用 MM-DD-YYYY 格式。我们的目标是添加一个新列,使用标准格式统一这两种日期。
##### 1.1 准备工作
- 使用上一个步骤生成的 CSV 文件,其日志格式为 `[<Timestamp>] - SALE - PRODUCT: <product id> - PRICE: <price>`。
- 安装 `parse` 模块,将其添加到 `requirements.txt` 文件中:
```bash
$ echo "parse==1.14.0" >> requirements.txt
$ pip install -r requirements.txt
```
- 从 GitHub 仓库获取日志文件,结构如下:
```
sale_logs/
OH
logs.txt
ON
logs.txt
```
代码可在 [GitHub 仓库](https://github.com/PacktPublishing/Python-Automation-Cookbook-Second-Edition/tree/master/Chapter07) 找到。
##### 1.2 操作步骤
1. 使用 `logs_to_csv.py` 脚本将数据导入并转换为 CSV 文件,添加位置信息:
```bash
$ python logs_to_csv.py sale_logs/OH/logs.txt output_1_OH.csv -l OH
$ python logs_to_csv.py sale_logs/ON/logs.txt output_1_ON.csv -l ON
```
2. 使用 `location_price.py` 脚本处理生成的文件:
```bash
$ python location_price.py output_1_OH.csv output_2_OH.csv
$ python location_price.py output_1_ON.csv output_2_ON.csv
```
3. 使用 `standard_date.py` 脚本处理文件:
```bash
$ python standard_date.py output_2_OH.csv output_3_OH.csv
$ python standard_date.py output_2_ON.csv output_3_ON.csv
```
4. 在电子表格中检查生成的 CSV 文件。
##### 1.3 工作原理
`standard_date.py` 脚本的主要部分如下:
```python
import csv
from datetime import datetime, timezone
def main(input_file, output_file):
reader = csv.DictReader(input_file)
result = [add_std_timestamp(row) for row in reader]
# Save into csv format
header = result[0].keys()
writer = csv.DictWriter(output_file, fieldnames=header)
writer.writeheader()
writer.writerows(result)
def add_std_timestamp(row):
country = row['COUNTRY']
if country == 'USA':
# No change
row['STD_TIMESTAMP'] = american_format(row['TIMESTAMP'])
elif country == 'CANADA':
# No change
row['STD_TIMESTAMP'] = row['TIMESTAMP']
else:
raise Exception('Country not found')
return row
def american_format(timestamp):
'''
Transform from MM-DD-YYYY HH:MM:SS to iso 8601
'''
FORMAT = '%m-%d-%Y %H:%M:%S'
parsed_tmp = datetime.strptime(timestamp, FORMAT)
time_with_tz = parsed_tmp.astimezone(timezone.utc)
isotimestamp = time_with_tz.isoformat()
return isotimestamp
```
该脚本使用 `CSV DictReader` 将每行转换为字典,然后根据国家对日期进行转换,最后使用 `DictWriter` 将结果写入输出文件。
##### 1.4 注意事项
- 时间戳格式检测可能不依赖于国家等其他参数,可能需要尝试多种格式。
- 日志文件中的时间可能存储在不同的时区,需要进行调整。
- 保留旧的时间戳列,以便后续参考和问题检测。
#### 2. 结果聚合
数据清洗完成后,我们可以处理结果。这里我们将计算按位置和日期的平均销售价格以及总销售额。
##### 2.1 准备工作
- 使用上一个步骤生成的 CSV 文件。
- 安装 `parse` 和 `delorean` 模块,将其添加到 `requirements.txt` 文件中:
```bash
$ echo "parse==1.14.0" >> requirements.txt
$ echo "delorean==1.0.0" >> requirements.txt
$ pip install -r requirements.txt
```
- 从 GitHub 仓库获取日志文件,结构同上。
##### 2.2 操作步骤
1. 使用 `logs_to_csv.py` 脚本将数据导入并转换为 CSV 文件,添加位置信息。
2. 使用 `location_price.py` 脚本处理生成的文件。
3. 使用 `standard_date.py` 脚本处理文件。
4. 使用 `aggregate_by_location.py` 脚本处理生成的文件:
```bash
$ python aggregate_by_location.py output_3_ON.csv aggregate_ON.csv
$ python aggregate_by_location.py output_3_OH.csv aggregate_OH.csv
```
5. 在电子表格中检查生成的 CSV 文件。
##### 2.3 工作原理
`aggregate_by_location.py` 脚本的主要部分如下:
```python
from decimal import Decimal
import delorean
import csv
def calculate_results(reader):
result = []
last_date = None
total_usd = 0
number = 0
for row in reader:
date = parse_iso(row['STD_TIMESTAMP'])
if not last_date:
last_date =
```
0
0
复制全文
相关推荐









