import os
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader
from torch_geometric.nn import GATv2Conv
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score
import warnings
# 设置中文字体支持
plt.rcParams["font.family"] = ["SimHei"]
warnings.filterwarnings('ignore')
# 设置随机种子以确保可重复性
def set_seed(seed=42):
np.random.seed(seed)
torch.manual_seed(seed)
torch.cuda.manual_seed_all(seed)
torch.backends.cudnn.deterministic = True
torch.backends.cudnn.benchmark = False
set_seed()
# 设备配置
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f"使用设备: {device}")
# 配置参数
class Config:
def __init__(self):
self.window_size = 24 # 输入窗口大小(历史数据长度)
self.pred_size = 12 # 预测窗口大小(未来预测长度)
self.batch_size = 32 # 批次大小
self.epochs = 100 # 训练轮数
self.learning_rate = 0.001 # 学习率
self.hidden_dim = 64 # 隐藏层维度
self.num_heads = 4 # 注意力头数
self.num_layers = 2 # GNN层数
self.dropout = 0.2 # dropout率
self.train_ratio = 0.7 # 训练集比例
self.val_ratio = 0.15 # 验证集比例
self.test_ratio = 0.15 # 测试集比例
# 数据加载和预处理
class TrafficDataset(Dataset):
def __init__(self, data, window_size, pred_size):
"""
交通流量数据集
data: 标准化后的流量数据,形状为 [时间步, 节点数]
window_size: 输入窗口大小
pred_size: 预测窗口大小
"""
self.data = data
self.window_size = window_size
self.pred_size = pred_size
def __len__(self):
return len(self.data) - self.window_size - self.pred_size + 1
def __getitem__(self, idx):
# 输入: [window_size, num_nodes]
x = self.data[idx:idx + self.window_size]
# 输出: [pred_size, num_nodes]
y = self.data[idx + self.window_size:idx + self.window_size + self.pred_size]
return torch.FloatTensor(x), torch.FloatTensor(y)
# 构建图结构(基于OD对的相关性)
def build_od_graph(data, threshold=0.6):
"""
基于OD对之间的相关性构建图结构
data: 形状为 [时间步, 节点数] 的流量数据
threshold: 相关性阈值
"""
# 计算节点间的相关性
corr_matrix = np.corrcoef(data.T) # [num_nodes, num_nodes]
edge_index = []
# 根据相关性阈值构建边
for i in range(corr_matrix.shape[0]):
for j in range(i + 1, corr_matrix.shape[1]):
if abs(corr_matrix[i, j]) > threshold:
edge_index.append([i, j])
edge_index.append([j, i]) # 无向图
# 如果没有足够的边,创建最小生成树确保图连通
if len(edge_index) < corr_matrix.shape[0]:
print(f"相关性阈值 {threshold} 过高,边数不足,创建最小生成树...")
from scipy.sparse.csgraph import minimum_spanning_tree
from scipy.sparse import csr_matrix
# 转换相关系数为距离
dist_matrix = 1 - np.abs(corr_matrix)
mst = minimum_spanning_tree(csr_matrix(dist_matrix))
mst = mst.toarray()
for i in range(mst.shape[0]):
for j in range(mst.shape[1]):
if mst[i, j] != 0:
edge_index.append([i, j])
edge_index.append([j, i])
# 转换为PyTorch张量并转置
edge_index = torch.LongTensor(edge_index).t().contiguous()
print(f"构建的图包含 {edge_index.shape[1] // 2} 条边")
return edge_index
# GNN-Transformer模型
class GNNTransformer(nn.Module):
def __init__(self, config):
super(GNNTransformer, self).__init__()
self.config = config
# GNN层
self.gnn_layers = nn.ModuleList()
# 输入层:window_size -> hidden_dim * num_heads
self.gnn_layers.append(
GATv2Conv(config.window_size, config.hidden_dim,
heads=config.num_heads, dropout=config.dropout)
)
# 后续GNN层
for _ in range(config.num_layers - 1):
self.gnn_layers.append(
GATv2Conv(config.hidden_dim * config.num_heads, config.hidden_dim,
heads=config.num_heads, dropout=config.dropout)
)
# Transformer编码器层
encoder_layer = nn.TransformerEncoderLayer(
d_model=config.hidden_dim * config.num_heads,
nhead=config.num_heads,
dim_feedforward=config.hidden_dim * 4,
dropout=config.dropout,
batch_first=True
)
self.transformer_encoder = nn.TransformerEncoder(encoder_layer, num_layers=2)
# 输出层:将隐藏维度映射到预测长度
self.output_layer = nn.Sequential(
nn.Linear(config.hidden_dim * config.num_heads, config.hidden_dim),
nn.ReLU(),
nn.Dropout(config.dropout),
nn.Linear(config.hidden_dim, config.pred_size)
)
def forward(self, x, edge_index, batch=None):
"""
x: 输入特征,形状为 [batch_size * num_nodes, window_size]
edge_index: 边索引,形状为 [2, num_edges]
batch: 批次索引,形状为 [batch_size * num_nodes]
"""
# GNN处理
for i, gnn_layer in enumerate(self.gnn_layers):
x = gnn_layer(x, edge_index)
if i < len(self.gnn_layers) - 1:
x = F.elu(x)
x = F.dropout(x, p=self.config.dropout, training=self.training)
# 重塑为 [batch_size, num_nodes, hidden_dim*num_heads]
batch_size = len(torch.unique(batch)) if batch is not None else 1
x = x.view(batch_size, self.config.num_nodes, -1)
# Transformer处理
x = self.transformer_encoder(x)
# 输出层,预测未来pred_size个时间步
x = self.output_layer(x) # [batch_size, num_nodes, pred_size]
# 转置为 [batch_size, pred_size, num_nodes] 以匹配目标形状
return x.permute(0, 2, 1)
# 训练函数
def train_model(model, train_loader, val_loader, edge_index, optimizer, criterion, config, device):
train_losses = []
val_losses = []
best_val_loss = float('inf')
best_model = None
edge_index = edge_index.to(device)
for epoch in range(config.epochs):
# 训练阶段
model.train()
train_loss = 0
for batch_idx, (data, target) in enumerate(train_loader):
data, target = data.to(device), target.to(device) # [batch, window, nodes]
# 重塑数据以适应GNN输入
batch_size, window_size, num_nodes = data.shape
data = data.permute(0, 2, 1).reshape(-1, window_size) # [batch*nodes, window]
# 创建批次索引
batch_indices = torch.arange(batch_size, device=device).repeat_interleave(num_nodes)
optimizer.zero_grad()
output = model(data, edge_index, batch_indices) # [batch, pred, nodes]
loss = criterion(output, target)
loss.backward()
# 梯度裁剪防止梯度爆炸
torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)
optimizer.step()
train_loss += loss.item()
# 每10个批次打印一次信息
if (batch_idx + 1) % 10 == 0:
print(
f'Epoch

甜辣uu
- 粉丝: 1w+
最新资源
- 计算机信息安全技术专业优秀教学计划安排.doc
- 智能家居系统平台分析-智建社区.docx
- 西门子PLC控制的五层电梯系统.doc
- 基于单片机温度测试系统硬件设计.doc
- 互联网背景下新媒体广告的传播方式及营销策略.docx
- 魔兽世界网络游戏推广营销方案.doc
- 互联网+背景下社区新媒体创新模式探究.docx
- 5G网络的端到端客户感知评估方法.docx
- 数控铣床FANUC系统编程代码.doc
- php页面静态化学习笔记.doc
- 基于Socket聊天室(C#版).doc
- 提升中学教师教育信息化素养策略初探.docx
- 中南大学物联网工程专业培养技术方案.doc
- 信息化时代医院图书馆精细化管理与服务质量提升.docx
- 综合布线工程企业全面预算管理取得实效的关键因素分析.docx
- 技术理性批判视域下的网络化办公分析.docx
资源上传下载、课程学习等过程中有任何疑问或建议,欢迎提出宝贵意见哦~我们会及时处理!
点击此处反馈


