基于/workspace/rrr/RML2016.10a_dict.pkl数据集,基于迁移学习,基于pytorch,基于ResNET50,写一个调制识别算法设置随机种子,要有损失曲线,准确率曲线,混淆矩阵,SNR-准确曲线,ROC曲线,AUC值,保存在自建一个文件夹output1下;添加 L1/L2 正则化,早停法(Early Stopping),冻结除全连接层和部分卷积层之外的,使用 Dropout 层,给出完整代码
时间: 2025-04-04 16:11:27 浏览: 112
<think>好的,我现在需要帮用户解决一个关于调制识别算法的问题。用户的要求挺多的,我得仔细看看。首先,用户希望基于RML2016.10a_dict.pkl数据集,使用PyTorch和ResNet50,结合迁移学习来写这个算法。还需要设置随机种子,生成各种曲线和矩阵,保存到output1文件夹,还要加入正则化、早停、冻结部分层、使用Dropout等。嗯,这些需求有点复杂,我得一步步来。
首先,我应该确认数据集的结构。RML2016.10a是一个常用的无线信号数据集,包含不同调制类型和信噪比的数据。用户提到了使用迁移学习,所以应该用预训练的ResNet50,但可能需要调整输入通道,因为信号数据通常是IQ两通道,而ResNet默认是3通道的图片。所以需要修改第一个卷积层,将输入通道改为2。这点需要注意,否则会报错。
然后是设置随机种子,确保结果可复现。PyTorch中需要设置torch.manual_seed,还有numpy和随机数生成器的种子。另外,如果使用CUDA,还要设置deterministic和benchmark为False。这部分代码应该放在前面。
接下来是数据加载。数据集是.pkl格式,需要先加载,然后划分训练集、验证集和测试集。可能需要将数据转换为PyTorch的DataLoader格式。这里要注意数据预处理,比如归一化,或者调整形状以适应ResNet的输入。ResNet通常接受3维输入(C×H×W),而信号数据可能是一维的,可能需要调整成二维,比如128×2这样的形式,或者通过reshape成128x2x1,然后作为两个通道的图像处理。
模型部分,使用ResNet50预训练模型,冻结除全连接层和部分卷积层之外的层。用户提到冻结除全连接层和部分卷积层,所以可能需要解冻后面的某些卷积层。例如,可以冻结前面的层,解冻layer4之后的层。同时,替换全连接层以适应分类任务。此外,添加Dropout层,这可能需要在全连接层之前或之间加入。
正则化方面,用户需要L1/L2正则化。在PyTorch中,可以通过在优化器中设置weight_decay来实现L2正则化,而L1需要自定义,比如在损失函数中加入权重的L1范数。需要确认用户是否需要同时使用L1和L2,还是选其中一个。用户可能是指弹性网络正则化,但通常可能只是L2。这里可能需要进一步处理,比如在优化器中添加weight_decay参数,并在训练循环中添加L1的计算。
早停法需要在验证损失不再下降时停止训练,并保存最佳模型。需要记录验证损失,当连续几个epoch没有改善时,终止训练。这部分需要设置patience参数,比如5次没有改善就停止。
可视化部分,需要绘制损失曲线、准确率曲线、混淆矩阵、SNR-准确率曲线、ROC曲线和AUC值。这些都需要在训练过程中收集数据,然后使用matplotlib或seaborn进行绘制,并保存到output1文件夹。混淆矩阵需要测试集的真实标签和预测标签,可能需要在测试阶段收集所有预测结果。ROC曲线和AUC则需要计算每个类别的概率,使用sklearn中的roc_auc_score和roc_curve。
保存结果方面,要确保output1文件夹存在,可以使用os.makedirs创建。所有生成的图片和模型权重都要保存到这个文件夹。
代码结构方面,可能需要分为数据准备、模型定义、训练循环、评估和可视化几个部分。需要确保各部分衔接正确,特别是数据加载和模型输入的匹配。
可能遇到的难点包括数据预处理是否正确,模型结构调整是否合理,正则化的实现是否正确,以及早停法的逻辑是否正确。此外,计算AUC时,对于多分类问题可能需要处理成多个二分类或者使用宏平均等方法。
现在,开始逐步构建代码。首先导入必要的库,设置随机种子。然后加载数据,预处理,划分数据集。接着定义模型,修改ResNet的输入通道,冻结部分层,添加Dropout。定义优化器和损失函数,加入正则化。训练过程中记录损失和准确率,实施早停。训练完成后,评估模型,绘制各种曲线和矩阵,保存结果。
需要测试每个部分是否正确,比如数据加载后的shape是否符合模型输入,模型是否部分冻结,Dropout是否生效,正则化是否被正确应用等。可能还需要调整超参数,如学习率、正则化系数、Dropout率等。
总之,这个任务需要综合运用PyTorch的模型构建、迁移学习、正则化技术、早停策略以及多种评估指标的可视化。必须仔细检查每一步,确保代码的正确性和功能的完整性。</think>以下是根据您需求编写的完整PyTorch代码实现:
```python
import os
import pickle
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, Dataset, random_split
from torchvision.models import resnet50
import matplotlib.pyplot as plt
from sklearn.metrics import confusion_matrix, roc_curve, auc
from sklearn.preprocessing import label_binarize
from itertools import cycle
# 设置随机种子
SEED = 42
torch.manual_seed(SEED)
np.random.seed(SEED)
torch.backends.cudnn.deterministic = True
torch.backends.cudnn.benchmark = False
# 创建输出目录
os.makedirs('output1', exist_ok=True)
# -------------------- 数据加载 --------------------
class RadioDataset(Dataset):
def __init__(self, data_path):
with open(data_path, 'rb') as f:
data = pickle.load(f, encoding='bytes')
self.X = []
self.y = []
self.snr = []
for key in data:
self.X.append(data[key][0])
self.y.append(data[key][1])
self.snr.append(key[1])
self.X = np.vstack(self.X).transpose((0, 2, 1)) # (N, 2, 128)
self.y = np.hstack(self.y)
self.snr = np.hstack(self.snr)
def __len__(self):
return len(self.y)
def __getitem__(self, idx):
return (torch.FloatTensor(self.X[idx]),
torch.tensor(self.y[idx], dtype=torch.long),
self.snr[idx])
# -------------------- 模型定义 --------------------
class ModifiedResNet50(nn.Module):
def __init__(self, num_classes):
super().__init__()
self.model = resnet50(pretrained=True)
# 修改第一层卷积输入通道数为2
self.model.conv1 = nn.Conv2d(2, 64, kernel_size=7, stride=2, padding=3, bias=False)
# 冻结除最后两个卷积块外的所有层
for param in self.model.parameters():
param.requires_grad = False
for param in self.model.layer4.parameters():
param.requires_grad = True
for param in self.model.layer3.parameters():
param.requires_grad = True
# 替换全连接层
self.model.fc = nn.Sequential(
nn.Dropout(0.5),
nn.Linear(2048, 512),
nn.ReLU(),
nn.Dropout(0.3),
nn.Linear(512, num_classes)
)
def forward(self, x):
x = x.unsqueeze(-1) # 增加最后一个维度 (batch, 2, 128) -> (batch, 2, 128, 1)
return self.model(x)
# -------------------- 训练配置 --------------------
def train():
# 数据加载
dataset = RadioDataset('/workspace/rrr/RML2016.10a_dict.pkl')
train_size = int(0.7 * len(dataset))
val_size = int(0.15 * len(dataset))
test_size = len(dataset) - train_size - val_size
train_set, val_set, test_set = random_split(dataset, [train_size, val_size, test_size])
train_loader = DataLoader(train_set, batch_size=128, shuffle=True)
val_loader = DataLoader(val_set, batch_size=128)
test_loader = DataLoader(test_set, batch_size=128)
# 模型初始化
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model = ModifiedResNet50(num_classes=len(np.unique(dataset.y))).to(device)
# 优化器与损失函数(添加L2正则化)
optimizer = optim.Adam(model.parameters(), lr=1e-4, weight_decay=1e-4) # L2正则化
criterion = nn.CrossEntropyLoss()
# 早停法配置
best_val_loss = float('inf')
patience = 5
counter = 0
# 训练记录
history = {'train_loss': [], 'val_loss': [], 'train_acc': [], 'val_acc': []}
# -------------------- 训练循环 --------------------
for epoch in range(100):
model.train()
train_loss, correct = 0, 0
for x, y, _ in train_loader:
x, y = x.to(device), y.to(device)
optimizer.zero_grad()
output = model(x)
loss = criterion(output, y)
# 添加L1正则化
l1_lambda = 0.001
l1_norm = sum(p.abs().sum() for p in model.parameters())
loss += l1_lambda * l1_norm
loss.backward()
optimizer.step()
train_loss += loss.item()
correct += (output.argmax(1) == y).sum().item()
# 验证阶段
model.eval()
val_loss, val_correct = 0, 0
with torch.no_grad():
for x, y, _ in val_loader:
x, y = x.to(device), y.to(device)
output = model(x)
val_loss += criterion(output, y).item()
val_correct += (output.argmax(1) == y).sum().item()
# 记录指标
train_loss = train_loss / len(train_loader)
train_acc = correct / len(train_set)
val_loss = val_loss / len(val_loader)
val_acc = val_correct / len(val_set)
history['train_loss'].append(train_loss)
history['val_loss'].append(val_loss)
history['train_acc'].append(train_acc)
history['val_acc'].append(val_acc)
# 早停判断
if val_loss < best_val_loss:
best_val_loss = val_loss
torch.save(model.state_dict(), 'output1/best_model.pth')
counter = 0
else:
counter += 1
if counter >= patience:
print(f'Early stopping at epoch {epoch}')
break
print(f'Epoch {epoch+1}: Train Loss {train_loss:.4f}, Val Loss {val_loss:.4f}')
# -------------------- 结果可视化 --------------------
# 绘制损失曲线和准确率曲线
plt.figure(figsize=(12, 5))
plt.subplot(1, 2, 1)
plt.plot(history['train_loss'], label='Train Loss')
plt.plot(history['val_loss'], label='Val Loss')
plt.legend()
plt.subplot(1, 2, 2)
plt.plot(history['train_acc'], label='Train Acc')
plt.plot(history['val_acc'], label='Val Acc')
plt.legend()
plt.savefig('output1/loss_acc_curve.png')
plt.close()
# -------------------- 测试评估 --------------------
model.load_state_dict(torch.load('output1/best_model.pth'))
model.eval()
# 混淆矩阵
y_true, y_pred = [], []
with torch.no_grad():
for x, y, _ in test_loader:
x = x.to(device)
output = model(x).cpu()
y_pred.extend(output.argmax(1).tolist())
y_true.extend(y.tolist())
cm = confusion_matrix(y_true, y_pred)
plt.figure(figsize=(10, 8))
plt.imshow(cm, interpolation='nearest', cmap=plt.cm.Blues)
plt.title('Confusion Matrix')
plt.colorbar()
plt.savefig('output1/confusion_matrix.png')
plt.close()
# SNR-准确率曲线
snr_acc = {}
with torch.no_grad():
for x, y, snr in test_loader:
x = x.to(device)
output = model(x).cpu()
pred = output.argmax(1)
for s, p, t in zip(snr, pred, y):
if s not in snr_acc:
snr_acc[s] = {'correct':0, 'total':0}
snr_acc[s]['correct'] += (p == t).item()
snr_acc[s]['total'] += 1
snrs = sorted(snr_acc.keys())
accs = [snr_acc[s]['correct']/snr_acc[s]['total'] for s in snrs]
plt.plot(snrs, accs)
plt.xlabel('SNR (dB)')
plt.ylabel('Accuracy')
plt.title('SNR vs Accuracy')
plt.savefig('output1/snr_acc_curve.png')
plt.close()
# ROC曲线和AUC值
y_score = []
with torch.no_grad():
for x, _, _ in test_loader:
x = x.to(device)
output = model(x).cpu().numpy()
y_score.extend(output)
y_test_bin = label_binarize(y_true, classes=np.unique(y_true))
n_classes = y_test_bin.shape[1]
fpr = dict()
tpr = dict()
roc_auc = dict()
for i in range(n_classes):
fpr[i], tpr[i], _ = roc_curve(y_test_bin[:, i], np.array(y_score)[:, i])
roc_auc[i] = auc(fpr[i], tpr[i])
plt.figure()
colors = cycle(['aqua', 'darkorange', 'cornflowerblue'])
for i, color in zip(range(n_classes), colors):
plt.plot(fpr[i], tpr[i], color=color,
label=f'Class {i} (AUC = {roc_auc[i]:.2f})')
plt.plot([0, 1], [0, 1], 'k--')
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.title('ROC Curve')
plt.legend()
plt.savefig('output1/roc_curve.png')
plt.close()
if __name__ == '__main__':
train()
```
关键实现说明:
1. **数据预处理**:
- 输入数据reshape为(2, 128)格式以适应CNN输入
- 添加维度扩展以适应ResNet的2D卷积结构
2. **模型修改**:
- 修改ResNet50首层卷积输入通道为2
- 冻结除layer3、layer4之外的所有层
- 添加两个Dropout层(0.5和0.3)
3. **正则化**:
- L2正则化通过优化器的weight_decay参数实现
- L1正则化通过手动添加参数绝对值到损失函数
4. **早停法**:
- 监控验证集损失,连续5个epoch不下降则停止训练
5. **可视化**:
- 保存四类图像:训练曲线、混淆矩阵、SNR-准确率曲线、ROC曲线
- 所有输出保存到output1文件夹
6. **其他特性**:
- 使用Adam优化器
- 保证结果可复现的随机种子设置
- 自动选择GPU/CPU设备
注意:实际运行前需要:
1. 确认数据路径正确
2. 安装所需依赖库(torch, numpy, matplotlib, sklearn等)
3. 根据显存调整batch_size
4. 可能需要调整正则化系数等超参数
阅读全文
相关推荐
















