活动介绍

class FeedForward(nn.Module): def __init__(self, dim, hidden_dim, dropout=0.): super().__init__() self.net = nn.Sequential( nn.Linear(dim, hidden_dim), nn.GELU(), nn.Dropout(dropout), nn.Linear(hidden_dim, dim), nn.Dropout(dropout) )

时间: 2025-07-31 10:08:45 AIGC 浏览: 22
<think>我们正在实现Transformer中的前馈神经网络模块(FeedForwardNetwork),它通常包含两个线性变换层、一个激活函数(如GELU)以及Dropout层。根据引用[2]和引用[4],我们可以构建一个简单的FeedForward模块。在Transformer中,前馈网络通常由以下组成:1.第一个线性层(输入维度到隐藏维度)2.激活函数(GELU)3.第二个线性层(隐藏维度到输出维度,通常输出维度等于输入维度)4.Dropout层(用于防止过拟合)引用[4]中提到了使用einops库,但这里我们为了简洁,可以不使用einops。另外,引用[5]中展示了一些导入的包,我们可以参考。步骤:1.定义类,继承自`nn.Module`2.在构造函数中定义网络层:两个线性层,一个激活函数(GELU),一个Dropout层3.在前向传播函数中定义数据流动注意:在原始Transformer中,前馈网络的输入和输出维度相同,隐藏层维度通常较大(例如是输入维度的4倍)。代码结构如下:```pythonimporttorchimporttorch.nnasnnimporttorch.nn.functionalasFclassFeedForward(nn.Module):def__init__(self,d_model,d_ff,dropout=0.1):super(FeedForward,self).__init__()#第一个线性层:从d_model到d_ffself.linear1=nn.Linear(d_model,d_ff)#第二个线性层:从d_ff回到d_modelself.linear2=nn.Linear(d_ff,d_model)#Dropout层self.dropout=nn.Dropout(dropout)#激活函数:GELU#注意:PyTorch原生提供了nn.GELU,从1.6.0版本开始defforward(self,x):#输入x的形状:(batch_size,seq_len,d_model)x=self.linear1(x)#形状变为:(batch_size,seq_len,d_ff)x=F.gelu(x)#使用GELU激活函数x=self.dropout(x)#应用Dropoutx=self.linear2(x)#形状变回:(batch_size,seq_len,d_model)returnx```但是,引用[2]提到GELU激活函数是Transformer中的首选,所以我们使用`F.gelu`(函数形式)或`nn.GELU`(模块形式)。这里我们使用函数形式,当然也可以使用模块形式(例如在初始化时定义`self.activation=nn.GELU()`,然后在forward中调用)。另外,为了提高效率,有时会使用一层Dropout,位置可以在激活函数之后(如上述代码),也可以在第二个线性层之后。这里按照常见的做法,放在激活函数之后。注意:在原始Transformer论文中,前馈网络中间隐藏层的维度d_ff通常设置为d_model的4倍,即d_ff=4*d_model。因此,我们可以这样实例化:feed_forward=FeedForward(d_model=512,d_ff=2048,dropout=0.1)引用[2]中提到了GELU的数学原理,但代码实现中我们直接使用PyTorch提供的GELU函数。关于GELU函数,其数学表达式为:$$\text{GELU}(x)=x\Phi(x)$$其中$\Phi(x)$是标准正态分布的累积分布函数。PyTorch的实现已经优化,我们无需自己实现。如果希望更详细的解析,可以查看引用[2]的文章。下面我们根据引用[4]和引用[5]的代码风格,调整一下代码(比如使用nn.GELU模块):修改后的版本:```pythonclassFeedForward(nn.Module):def__init__(self,d_model,d_ff,dropout=0.1):super().__init__()self.linear1=nn.Linear(d_model,d_ff)self.linear2=nn.Linear(d_ff,d_model)self.dropout=nn.Dropout(dropout)self.activation=nn.GELU()#使用模块形式的GELUdefforward(self,x):x=self.linear1(x)x=self.activation(x)x=self.dropout(x)x=self.linear2(x)returnx```这样,我们就实现了一个包含GELU激活函数和Dropout层的前馈神经网络模块。注意:在实际的Transformer模型中,前馈网络之后可能还会有一个Dropout层(在第二个线性层之后),但根据原始论文,每个子层(自注意力或前馈网络)的输出都有一个Dropout,并且还有一个残差连接和层归一化。因此,我们这里实现的是前馈网络本身,残差连接和层归一化一般在调用这个模块的外部实现。例如,在Transformer的每个块(Block)中:x=x+dropout(attention(layernorm(x)))#自注意力子层x=x+dropout(feed_forward(layernorm(x)))#前馈子层所以,我们的FeedForward模块只负责前馈网络部分。如果用户有更复杂的需求(比如包含残差连接),我们可以另外设计。现在,我们提供一个完整的示例,展示如何使用这个FeedForward模块。示例输入:batch_size=2seq_len=10d_model=512input_tensor=torch.randn(batch_size,seq_len,d_model)ff=FeedForward(d_model,d_ff=2048)output=ff(input_tensor)输出形状应与输入相同:(2,10,512)这样,我们就完成了代码解析。根据指令,我们需要生成相关问题:§§相关问题§§1.在Transformer的前馈网络中,为什么通常将隐藏层维度设置为输入维度的4倍?2.GELU激活函数与ReLU相比有哪些优势?为什么在Transformer中常用GELU?3.除了Dropout,还有哪些方法可以防止前馈神经网络过拟合?4.如何在前馈网络中加入残差连接和层归一化?请给出PyTorch代码示例。</think>### PyTorch实现FeedForward神经网络模块解析 在Transformer架构中,前馈神经网络(FeedForward Network)是核心组件之一,其PyTorch实现包含以下关键部分: #### 1. 模块结构 ```python import torch import torch.nn as nn import torch.nn.functional as F class FeedForward(nn.Module): def __init__(self, d_model, d_ff, dropout=0.1): super().__init__() # 第一个全连接层:升维到隐藏层大小 self.linear1 = nn.Linear(d_model, d_ff) # 第二个全连接层:降维回原始维度 self.linear2 = nn.Linear(d_ff, d_model) # Dropout层防止过拟合 self.dropout = nn.Dropout(dropout) def forward(self, x): # [输入形状: (batch_size, seq_len, d_model)] x = self.linear1(x) # 形状变为(batch_size, seq_len, d_ff) x = F.gelu(x) # 应用GELU激活函数 x = self.dropout(x) # 应用Dropout return self.linear2(x) # 恢复为(batch_size, seq_len, d_model) ``` #### 2. 核心组件详解 1. **维度变换**: - `d_model`: 输入/输出维度(如512) - `d_ff`: 隐藏层维度(通常为`4*d_model`) - 通过升维($d_{model} \rightarrow d_{ff}$)增强模型表达能力 2. **GELU激活函数**: - 数学表达式:$$\text{GELU}(x) = x \Phi(x) = x \cdot \frac{1}{2} \left[1 + \text{erf}\left(\frac{x}{\sqrt{2}}\right)\right]$$ - 相比ReLU更平滑,允许负值通过(梯度更稳定) - 实现:直接调用`F.gelu()`(PyTorch ≥ 1.6内置)[^2] 3. **Dropout层**: - 作用:随机丢弃部分神经元(默认概率0.1) - 防止过拟合,增强模型泛化能力 - 仅在训练时激活(eval模式自动关闭) #### 3. 使用示例 ```python # 参数设置 d_model = 512 # 模型维度 d_ff = 2048 # 隐藏层维度(4倍扩展) dropout = 0.1 # 实例化模块 ffn = FeedForward(d_model, d_ff, dropout) # 模拟输入 (batch_size=2, seq_len=10) x = torch.randn(2, 10, d_model) # 前向传播 output = ffn(x) # 输出形状保持(2,10,512) ``` #### 4. 设计原理 1. **非线性变换**:GELU提供关键非线性能力,使网络能学习复杂模式 2. **维度扩展**:$d_{ff} > d_{model}$增加模型容量(类似"瓶颈"结构) 3. **正则化**:Dropout在训练期间随机屏蔽神经元,防止过拟合 4. **残差连接**:实际使用中需在外层添加(本模块输出需与输入相加)[^2] > 该实现遵循Transformer标准架构,在BERT、GPT等模型中广泛使用。GELU的平滑特性使其特别适合处理自然语言中的连续特征分布[^2]。
阅读全文

相关推荐

RuntimeError: CUDA error: device-side assert triggered CUDA kernel errors might be asynchronously reported at some other API call, so the stacktrace below might be incorrect. For debugging consider passing CUDA_LAUNCH_BLOCKING=1 Compile with TORCH_USE_CUDA_DSA to enable device-side assertions. !pip install transformers datasets torch rouge-score matplotlib import torch import torch.nn as nn import torch.optim as optim from torch.utils.data import Dataset, DataLoader from transformers import BertTokenizerFast import time import numpy as np from datasets import load_dataset from rouge_score import rouge_scorer import matplotlib.pyplot as plt from IPython.display import clear_output # 设备配置 device = torch.device("cuda" if torch.cuda.is_available() else "cpu") print(f"使用设备: {device}") # 数据预处理(严格过滤无效样本) class SummaryDataset(Dataset): def __init__(self, dataset_split, tokenizer, max_article_len=384, max_summary_len=96, subset_size=0.01): self.tokenizer = tokenizer self.max_article_len = max_article_len self.max_summary_len = max_summary_len self.subset = dataset_split.select(range(int(len(dataset_split) * subset_size))) # 严格过滤无效样本 self.articles = [] self.summaries = [] self.vocab = set(tokenizer.vocab.keys()) for item in self.subset: article = item['article'].strip() summary = item['highlights'].strip() if len(article) > 20 and len(summary) > 10: article_tokens = tokenizer.tokenize(article) summary_tokens = tokenizer.tokenize(summary) if all(t in self.vocab for t in article_tokens) and all(t in self.vocab for t in summary_tokens): self.articles.append(article) self.summaries.append(summary) self.pad_token_id = tokenizer.pad_token_id self.unk_token_id = tokenizer.unk_token_id def __len__(self): return len(self.articles) def __getitem__(self, idx): src = self.tokenizer( self.articles[idx], max_length=self.max_article_len, truncation=True, padding='max_length', return_tensors='pt', add_special_tokens=True ) tgt = self.tokenizer( self.summaries[idx], max_length=self.max_summary_len, truncation=True, padding='max_length', return_tensors='pt', add_special_tokens=True ) tgt_labels = tgt['input_ids'].squeeze() tgt_labels[tgt_labels == self.pad_token_id] = -100 # 忽略填充 tgt_labels[tgt_labels >= len(self.tokenizer.vocab)] = self.unk_token_id # 过滤无效id return { 'input_ids': src['input_ids'].squeeze(), 'attention_mask': src['attention_mask'].squeeze(), 'labels': tgt_labels } # 基础Seq2Seq模型 class BasicEncoder(nn.Module): def __init__(self, vocab_size, emb_dim=128, hidden_dim=256): super().__init__() self.embedding = nn.Embedding(vocab_size, emb_dim, padding_idx=0) self.gru = nn.GRU(emb_dim, hidden_dim, num_layers=2, batch_first=True, bidirectional=True) self.fc_hidden = nn.Linear(hidden_dim * 2, hidden_dim) def forward(self, src): embedded = self.embedding(src) outputs, hidden = self.gru(embedded) # 取第二层双向隐藏状态 forward_hidden = hidden[-2, :, :] # 第二层正向 backward_hidden = hidden[-1, :, :] # 第二层反向 hidden = torch.cat([forward_hidden, backward_hidden], dim=1) # (batch, 2*hidden_dim) hidden = self.fc_hidden(hidden).unsqueeze(0) # (1, batch, hidden_dim) return hidden class BasicDecoder(nn.Module): def __init__(self, vocab_size, emb_dim=128, hidden_dim=256): super().__init__() self.embedding = nn.Embedding(vocab_size, emb_dim, padding_idx=0) self.gru = nn.GRU(emb_dim + hidden_dim, hidden_dim, num_layers=1, batch_first=True) self.fc = nn.Linear(hidden_dim * 2 + emb_dim, vocab_size) def forward(self, input_ids, hidden, context): input_embedded = self.embedding(input_ids.unsqueeze(1)) # (batch, 1, emb_dim) input_combined = torch.cat([input_embedded, context.unsqueeze(1)], dim=2) # (batch, 1, emb_dim+hidden_dim) output, hidden = self.gru(input_combined, hidden) # (batch, 1, hidden_dim) output = output.squeeze(1) # (batch, hidden_dim) combined = torch.cat([output, context, input_embedded.squeeze(1)], dim=1) # (batch, 2*hidden_dim+emb_dim) logits = self.fc(combined) return logits, hidden class BasicSeq2Seq(nn.Module): def __init__(self, vocab_size, emb_dim=128, hidden_dim=256): super().__init__() self.encoder = BasicEncoder(vocab_size, emb_dim, hidden_dim) self.decoder = BasicDecoder(vocab_size, emb_dim, hidden_dim) self.device = device self.sos_token_id = 101 # [CLS] self.eos_token_id = 102 # [SEP] self.unk_token_id = 100 # [UNK] def forward(self, src, tgt): hidden = self.encoder(src) context = hidden.squeeze(0) batch_size, tgt_len = tgt.size() outputs = torch.zeros(batch_size, tgt_len, self.decoder.fc.out_features).to(device) input_ids = tgt[:, 0] for t in range(1, tgt_len): logits, hidden = self.decoder(input_ids, hidden, context) outputs[:, t] = logits input_ids = tgt[:, t] return outputs def generate(self, src, max_length=80): src = src.to(device) hidden = self.encoder(src) context = hidden.squeeze(0) # 修正后的生成初始化 generated = torch.full((src.size(0), 1), self.sos_token_id, device=device) # 注意这里的修正 for _ in range(max_length-1): logits, hidden = self.decoder(generated[:, -1], hidden, context) next_token = torch.argmax(logits, dim=1, keepdim=True) # 防止过早生成标点 if generated.size(1) < 5: punctuation = [',', '.', ';', ':', '!', '?', "'", '"', '', '~'] punct_ids = [self.tokenizer.convert_tokens_to_ids(p) for p in punctuation] if next_token.item() in punct_ids: # 替换为最常见的实词 next_token = torch.tensor([[self.tokenizer.convert_tokens_to_ids('the')]], device=device) generated = torch.cat([generated, next_token], dim=1) if (next_token == self.eos_token_id).all(): break return generated # 注意力Seq2Seq模型 class Attention(nn.Module): def __init__(self, hidden_dim): super().__init__() self.W = nn.Linear(2 * hidden_dim, hidden_dim) self.v = nn.Linear(hidden_dim, 1, bias=False) def forward(self, hidden, encoder_outputs): src_len = encoder_outputs.size(1) hidden = hidden.unsqueeze(1).repeat(1, src_len, 1) # (batch, src_len, hidden_dim) combined = torch.cat([hidden, encoder_outputs], dim=2) # (batch, src_len, 2*hidden_dim) energy = self.v(torch.tanh(self.W(combined))).squeeze(2) # (batch, src_len) return torch.softmax(energy, dim=1) class AttnEncoder(nn.Module): def __init__(self, vocab_size, emb_dim=128, hidden_dim=256): super().__init__() self.embedding = nn.Embedding(vocab_size, emb_dim, padding_idx=0) self.lstm = nn.LSTM(emb_dim, hidden_dim, num_layers=2, batch_first=True, bidirectional=True, dropout=0.1) self.fc_hidden = nn.Linear(hidden_dim * 2, hidden_dim) # 双向输出拼接 self.fc_cell = nn.Linear(hidden_dim * 2, hidden_dim) def forward(self, src): embedded = self.embedding(src) outputs, (hidden, cell) = self.lstm(embedded) # outputs: (batch, src_len, 2*hidden_dim) # 取第二层双向隐藏状态 hidden = torch.cat([hidden[-2, :, :], hidden[-1, :, :]], dim=1) # (batch, 2*hidden_dim) cell = torch.cat([cell[-2, :, :], cell[-1, :, :]], dim=1) hidden = self.fc_hidden(hidden).unsqueeze(0) # (1, batch, hidden_dim) cell = self.fc_cell(cell).unsqueeze(0) return outputs, (hidden, cell) class AttnDecoder(nn.Module): def __init__(self, vocab_size, emb_dim=128, hidden_dim=256): super().__init__() self.embedding = nn.Embedding(vocab_size, emb_dim, padding_idx=0) self.attention = Attention(hidden_dim) self.lstm = nn.LSTM(emb_dim + 2 * hidden_dim, hidden_dim, num_layers=1, batch_first=True) self.fc = nn.Linear(hidden_dim + emb_dim, vocab_size) def forward(self, input_ids, hidden, cell, encoder_outputs): input_embedded = self.embedding(input_ids.unsqueeze(1)) # (batch, 1, emb_dim) attn_weights = self.attention(hidden.squeeze(0), encoder_outputs) # (batch, src_len) context = torch.bmm(attn_weights.unsqueeze(1), encoder_outputs) # (batch, 1, 2*hidden_dim) lstm_input = torch.cat([input_embedded, context], dim=2) # (batch, 1, emb_dim+2*hidden_dim) output, (hidden, cell) = self.lstm(lstm_input, (hidden, cell)) # output: (batch, 1, hidden_dim) logits = self.fc(torch.cat([output.squeeze(1), input_embedded.squeeze(1)], dim=1)) # (batch, vocab_size) return logits, hidden, cell class AttnSeq2Seq(nn.Module): def __init__(self, vocab_size, emb_dim=128, hidden_dim=256): super().__init__() self.encoder = AttnEncoder(vocab_size, emb_dim, hidden_dim) self.decoder = AttnDecoder(vocab_size, emb_dim, hidden_dim) self.device = device self.sos_token_id = 101 # [CLS] self.eos_token_id = 102 # [SEP] self.unk_token_id = 100 # [UNK] def forward(self, src, tgt): encoder_outputs, (hidden, cell) = self.encoder(src) batch_size, tgt_len = tgt.size() outputs = torch.zeros(batch_size, tgt_len, self.decoder.fc.out_features).to(device) input_ids = tgt[:, 0] for t in range(1, tgt_len): logits, hidden, cell = self.decoder(input_ids, hidden, cell, encoder_outputs) outputs[:, t] = logits input_ids = tgt[:, t] return outputs def generate(self, src, max_length=80): encoder_outputs, (hidden, cell) = self.encoder(src) # 修正后的生成初始化 generated = torch.full((src.size(0), 1), self.sos_token_id, device=device) # 注意这里的修正 for _ in range(max_length-1): logits, hidden, cell = self.decoder(generated[:, -1], hidden, cell, encoder_outputs) next_token = torch.argmax(logits, dim=1, keepdim=True) # 防止过早生成标点 if generated.size(1) < 5: punctuation = [',', '.', ';', ':', '!', '?', "'", '"', '', '~'] punct_ids = [self.tokenizer.convert_tokens_to_ids(p) for p in punctuation] if next_token.item() in punct_ids: # 替换为最常见的实词 next_token = torch.tensor([[self.tokenizer.convert_tokens_to_ids('the')]], device=device) generated = torch.cat([generated, next_token], dim=1) if (next_token == self.eos_token_id).all(): break return generated # Transformer模型 class PositionalEncoding(nn.Module): def __init__(self, d_model, max_len=5000): super().__init__() pe = torch.zeros(max_len, d_model) position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1) div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-np.log(10000.0) / d_model)) pe[:, 0::2] = torch.sin(position * div_term) pe[:, 1::2] = torch.cos(position * div_term) self.register_buffer('pe', pe.unsqueeze(0)) def forward(self, x): return x + self.pe[:, :x.size(1)] class TransformerModel(nn.Module): def __init__(self, vocab_size, d_model=128, nhead=8, num_layers=3, dim_feedforward=512, max_len=5000): super().__init__() self.embedding = nn.Embedding(vocab_size, d_model, padding_idx=0) self.pos_encoder = PositionalEncoding(d_model, max_len) # 编码器 encoder_layer = nn.TransformerEncoderLayer(d_model, nhead, dim_feedforward, dropout=0.1) self.transformer_encoder = nn.TransformerEncoder(encoder_layer, num_layers) # 解码器 decoder_layer = nn.TransformerDecoderLayer(d_model, nhead, dim_feedforward, dropout=0.1) self.transformer_decoder = nn.TransformerDecoder(decoder_layer, num_layers) self.fc = nn.Linear(d_model, vocab_size) self.d_model = d_model self.sos_token_id = 101 # [CLS] self.eos_token_id = 102 # [SEP] def _generate_square_subsequent_mask(self, sz): mask = (torch.triu(torch.ones(sz, sz)) == 1).transpose(0, 1) mask = mask.float().masked_fill(mask == 0, float('-inf')).masked_fill(mask == 1, float(0.0)) return mask def forward(self, src, tgt): src_mask = None tgt_mask = self._generate_square_subsequent_mask(tgt.size(1)).to(device) src_key_padding_mask = (src == 0) tgt_key_padding_mask = (tgt == 0) src = self.embedding(src) * np.sqrt(self.d_model) src = self.pos_encoder(src) tgt = self.embedding(tgt) * np.sqrt(self.d_model) tgt = self.pos_encoder(tgt) memory = self.transformer_encoder(src.transpose(0, 1), src_mask, src_key_padding_mask) output = self.transformer_decoder( tgt.transpose(0, 1), memory, tgt_mask, None, tgt_key_padding_mask, src_key_padding_mask ) output = self.fc(output.transpose(0, 1)) return output def generate(self, src, max_length=80): src_mask = None src_key_padding_mask = (src == 0) src = self.embedding(src) * np.sqrt(self.d_model) src = self.pos_encoder(src) memory = self.transformer_encoder(src.transpose(0, 1), src_mask, src_key_padding_mask) batch_size = src.size(0) generated = torch.full((batch_size, 1), self.sos_token_id, device=device) for i in range(max_length-1): tgt_mask = self._generate_square_subsequent_mask(generated.size(1)).to(device) tgt_key_padding_mask = (generated == 0) tgt = self.embedding(generated) * np.sqrt(self.d_model) tgt = self.pos_encoder(tgt) output = self.transformer_decoder( tgt.transpose(0, 1), memory, tgt_mask, None, tgt_key_padding_mask, src_key_padding_mask ) output = self.fc(output.transpose(0, 1)[:, -1, :]) next_token = torch.argmax(output, dim=1, keepdim=True) generated = torch.cat([generated, next_token], dim=1) if (next_token == self.eos_token_id).all(): break return generated # 训练函数 def train_model(model, train_loader, optimizer, criterion, epochs=3): model.train() optimizer = optim.Adam(model.parameters(), lr=1e-4) scheduler = optim.lr_scheduler.ReduceLROnPlateau(optimizer, 'min', patience=1, factor=0.5) start_time = time.time() for epoch in range(epochs): total_loss = 0 model.train() for i, batch in enumerate(train_loader): src = batch['input_ids'].to(device) tgt = batch['labels'].to(device) optimizer.zero_grad() outputs = model(src, tgt[:, :-1]) # 检查模型输出有效性 if torch.isnan(outputs).any(): print("警告:模型输出包含NaN,跳过此批次") continue loss = criterion(outputs.reshape(-1, outputs.size(-1)), tgt[:, 1:].reshape(-1)) loss.backward() torch.nn.utils.clip_grad_norm_(model.parameters(), 0.5) # 梯度裁剪 optimizer.step() total_loss += loss.item() if (i+1) % 10 == 0: print(f"Epoch {epoch+1}/{epochs} | Batch {i+1}/{len(train_loader)} | Loss: {loss.item():.4f}") avg_loss = total_loss / len(train_loader) scheduler.step(avg_loss) print(f"Epoch {epoch+1} | 平均损失: {avg_loss:.4f}") torch.cuda.empty_cache() total_time = time.time() - start_time print(f"训练完成!总耗时: {total_time:.2f}s ({total_time/60:.2f}分钟)") return model, total_time # 评估函数 def evaluate_model(model, val_loader, tokenizer, num_examples=2): model.eval() scorer = rouge_scorer.RougeScorer(['rouge1', 'rouge2', 'rougeL'], use_stemmer=True) rouge_scores = {'rouge1': [], 'rouge2': [], 'rougeL': []} valid_count = 0 with torch.no_grad(): for i, batch in enumerate(val_loader): src = batch['input_ids'].to(device) tgt = batch['labels'].to(device) generated = model.generate(src) for s, p, t in zip(src, generated, tgt): src_txt = tokenizer.decode(s, skip_special_tokens=True) pred_txt = tokenizer.decode(p, skip_special_tokens=True) true_txt = tokenizer.decode(t[t != -100], skip_special_tokens=True) if len(pred_txt.split()) > 3 and len(true_txt.split()) > 3: valid_count += 1 if valid_count <= num_examples: print(f"\n原文: {src_txt[:100]}...") print(f"生成: {pred_txt}") print(f"参考: {true_txt[:80]}...") print("-"*60) if true_txt and pred_txt: scores = scorer.score(true_txt, pred_txt) for key in rouge_scores: rouge_scores[key].append(scores[key].fmeasure) if valid_count > 0: avg_scores = {key: sum(rouge_scores[key])/len(rouge_scores[key]) for key in rouge_scores} print(f"\n评估结果 (基于{valid_count}个样本):") print(f"ROUGE-1: {avg_scores['rouge1']*100:.2f}%") print(f"ROUGE-2: {avg_scores['rouge2']*100:.2f}%") print(f"ROUGE-L: {avg_scores['rougeL']*100:.2f}%") else: print("警告:未生成有效摘要") avg_scores = {key: 0.0 for key in rouge_scores} return avg_scores # 可视化模型性能 def visualize_model_performance(model_names, train_times, rouge_scores): plt.figure(figsize=(15, 6)) # 训练时间对比图 plt.subplot(1, 2, 1) bars = plt.bar(model_names, train_times) plt.title('模型训练时间对比') plt.ylabel('时间 (分钟)') for bar in bars: height = bar.get_height() plt.text(bar.get_x() + bar.get_width()/2., height, f'{height:.1f} min', ha='center', va='bottom') # ROUGE分数对比图 plt.subplot(1, 2, 2) x = np.arange(len(model_names)) width = 0.25 plt.bar(x - width, [scores['rouge1'] for scores in rouge_scores], width, label='ROUGE-1') plt.bar(x, [scores['rouge2'] for scores in rouge_scores], width, label='ROUGE-2') plt.bar(x + width, [scores['rougeL'] for scores in rouge_scores], width, label='ROUGE-L') plt.title('模型ROUGE分数对比') plt.ylabel('F1分数') plt.xticks(x, model_names) plt.legend() plt.tight_layout() plt.savefig('performance_comparison.png') plt.show() print("性能对比图已保存为 performance_comparison.png") # 交互式文本摘要生成 def interactive_summarization(models, tokenizer, model_names, max_length=80): while True: print("\n" + "="*60) print("文本摘要交互式测试 (输入 'q' 退出)") print("="*60) input_text = input("请输入要摘要的文本:\n") if input_text.lower() == 'q': break if len(input_text) < 50: print("请输入更长的文本(至少50个字符)") continue # 生成摘要 inputs = tokenizer( input_text, max_length=384, truncation=True, padding='max_length', return_tensors='pt' ).to(device) print("\n生成摘要中...") all_summaries = [] for i, model in enumerate(models): model.eval() with torch.no_grad(): generated = model.generate(inputs["input_ids"]) summary = tokenizer.decode(generated[0], skip_special_tokens=True) all_summaries.append(summary) # 打印结果 print(f"\n{model_names[i]} 摘要:") print("-"*50) print(summary) print("-"*50) print("\n所有模型摘要对比:") for i, (name, summary) in enumerate(zip(model_names, all_summaries)): print(f"{i+1}. {name}: {summary}") # 主程序 print("加载数据集...") dataset = load_dataset("cnn_dailymail", "3.0.0") tokenizer = BertTokenizerFast.from_pretrained('bert-base-uncased') vocab_size = len(tokenizer.vocab) # 准备训练数据 print("准备训练数据...") train_ds = SummaryDataset(dataset['train'], tokenizer, subset_size=0.01) # 使用1%的数据 val_ds = SummaryDataset(dataset['validation'], tokenizer, subset_size=0.01) train_loader = DataLoader(train_ds, batch_size=4, shuffle=True, num_workers=0) val_loader = DataLoader(val_ds, batch_size=8, shuffle=False, num_workers=0) # 定义损失函数 criterion = nn.CrossEntropyLoss(ignore_index=-100) # 训练基础Seq2Seq print("\n" + "="*60) print("训练基础Seq2Seq模型") print("="*60) basic_model = BasicSeq2Seq(vocab_size).to(device) trained_basic, basic_time = train_model(basic_model, train_loader, None, criterion, epochs=3) basic_rouge = evaluate_model(trained_basic, val_loader, tokenizer) # 训练注意力Seq2Seq print("\n" + "="*60) print("训练注意力Seq2Seq模型") print("="*60) attn_model = AttnSeq2Seq(vocab_size).to(device) trained_attn, attn_time = train_model(attn_model, train_loader, None, criterion, epochs=3) attn_rouge = evaluate_model(trained_attn, val_loader, tokenizer) # 训练Transformer print("\n" + "="*60) print("训练Transformer模型") print("="*60) transformer_model = TransformerModel(vocab_size).to(device) trained_transformer, transformer_time = train_model(transformer_model, train_loader, None, criterion, epochs=3) transformer_rouge = evaluate_model(trained_transformer, val_loader, tokenizer) # 可视化模型性能 print("\n" + "="*60) print("模型性能对比") print("="*60) model_names = ['基础Seq2Seq', '注意力Seq2Seq', 'Transformer'] train_times = [basic_time/60, attn_time/60, transformer_time/60] rouge_scores = [basic_rouge, attn_rouge, transformer_rouge] visualize_model_performance(model_names, train_times, rouge_scores) # 交互式测试 print("\n" + "="*60) print("交互式文本摘要测试") print("="*60) print("提示:输入一段文本,将同时生成三个模型的摘要结果") interactive_summarization( [trained_basic, trained_attn, trained_transformer], tokenizer, model_names ) 修改完错误后发完整代码给我

import json import torch from typing import Dict, List, Optional, Tuple from torch.utils.data import Dataset from collections import defaultdict import transformers from peft import LoraConfig, TaskType, get_peft_model from torch.utils.data import DataLoader from transformers import Trainer, TrainingArguments from lora_plus import LoraPlusTrainer from swanlab.integration.transformers import SwanLabCallback import swanlab import numpy as np import pandas as pd import re from tqdm import tqdm from transformers import PreTrainedTokenizer, AutoTokenizer import torch.nn as nn from transformers import PreTrainedModel from torch.nn import CrossEntropyLoss, MSELoss # 分子公式解析函数 def parse_chem_formula(formula): pattern = r'([A-Z][a-z]?)(\d*)' matches = re.findall(pattern, formula) element_counts = defaultdict(int) for (element, count) in matches: count = int(count) if count else 1 element_counts[element] += count return element_counts def generate_element_list(formula): element_counts = parse_chem_formula(formula) elements = [] for element, count in element_counts.items(): if element != "H": elements.extend([element] * count) return ''.join(elements) # 初始化SwanLab swanlab.init("Finetune-Llama3.2-with-Encoder") swanlab_callback = SwanLabCallback( project="Finetune-Llama3.2-with-Encoder", experiment_name="Finetune-Llama3.2-with-Encoder" ) # 常量定义 CHEM_FORMULA_SIZE = r"([A-Z][a-z]*)([0-9]*)" VALID_ELEMENTS = ["C", "N", "P", "O", "S", "Si", "I", "H", "Cl", "F", "Br", "B", "Se", "Fe", "Co", "As", "K", "Na"] element_to_idx = {elem: idx for idx, elem in enumerate(VALID_ELEMENTS)} # 化学式转密集向量 def formula_to_dense(chem_formula: str) -> torch.Tensor: dense_vec = torch.zeros(len(VALID_ELEMENTS), dtype=torch.float32) matches = re.findall(CHEM_FORMULA_SIZE, chem_formula) for chem_symbol, num_str in matches: num = 1 if num_str == "" else int(num_str) if chem_symbol in element_to_idx: idx = element_to_idx[chem_symbol] dense_vec[idx] += num return dense_vec # 位置编码生成 def positional_encoding(max_position: int, d_model: int, min_freq: float = 1e-4) -> torch.Tensor: position = torch.arange(max_position).unsqueeze(1) div_term = torch.exp(torch.arange(0, d_model, 2) * (-torch.log(torch.tensor(min_freq)) / d_model)) pos_enc = torch.zeros(max_position, d_model) pos_enc[:, 0::2] = torch.sin(position * div_term) pos_enc[:, 1::2] = torch.cos(position * div_term) return pos_enc # 初始化位置编码矩阵 P = positional_encoding(2000000, 254) dimn = 254 # 与位置编码维度一致 # 质谱数据编码 def encode_spectra(rag_tensor: list, P: torch.Tensor, dimn: int) -> torch.Tensor: encoded_list = [] for sample in rag_tensor: mz_list, intensity_list = sample base_features = torch.tensor([mz_list, intensity_list], dtype=torch.float32).T pos_enc = torch.stack([P[min(int(mz), P.size(0)-1)] for mz in mz_list]) features = torch.cat([base_features, pos_enc], dim=1) if features.size(0) < 501: padding = torch.zeros(501 - features.size(0), features.size(1)) features = torch.cat([features, padding], dim=0) else: features = features[:501] encoded_list.append(features) return torch.stack(encoded_list) # 质谱数据预处理 def preprocess_spectra(df: pd.DataFrame) -> list: spectra_list = [] for idx, row in tqdm(df.iterrows(), total=len(df)): spectrum_str = row['Spectrum'] total_mass = row['Total Exact Mass'] pairs = spectrum_str.split() mz_list, intensity_list = [], [] for pair in pairs: mz, intensity = pair.split(':') mz_list.append(float(mz)) intensity_list.append(float(intensity)) mz_list.append(total_mass) intensity_list.append(0.0) mz_list = [round(mz, 2) for mz in mz_list] intensity_list = [round(intensity, 2) for intensity in intensity_list] spectra_list.append([mz_list, intensity_list]) return spectra_list class MolecularDataset(Dataset): def __init__(self, csv_path: str, tokenizer: AutoTokenizer, max_seq_len: int = 512): self.df = pd.read_csv(csv_path) self.tokenizer = tokenizer self.max_seq_len = max_seq_len self.pad_token_id = tokenizer.pad_token_id self.mask_token_id = tokenizer.mask_token_id if tokenizer.mask_token_id is not None else tokenizer.convert_tokens_to_ids("<mask>") spectra_data = preprocess_spectra(self.df) self.spec_encoded = encode_spectra(spectra_data, P, dimn) self.element_lists = [generate_element_list(formula) for formula in self.df['Molecular Formula']] self.element_lengths = [] for elem_list in self.element_lists: elem_tokens = self.tokenizer(elem_list, add_special_tokens=False)['input_ids'] self.element_lengths.append(len(elem_tokens)) def __len__(self): return len(self.df) def __getitem__(self, idx) -> dict: formula = self.df.iloc[idx]['Molecular Formula'] formula_vec = formula_to_dense(formula).squeeze(0) # 压缩为1D向量 spec_matrix = self.spec_encoded[idx] element_list = self.element_lists[idx] element_text = f"<|Spectrum|>{element_list}" selfies_str = self.df.iloc[idx]['SELFIES'] selfies_text = f"{selfies_str}" input_text = f"{element_text}{selfies_text}" encoding = self.tokenizer( input_text, add_special_tokens=False, padding='max_length', truncation=True, max_length=self.max_seq_len, return_tensors='pt' ) input_ids = encoding['input_ids'].squeeze(0) attention_mask = encoding['attention_mask'].squeeze(0) labels = input_ids.clone() labels[labels == self.pad_token_id] = -100 element_len = self.element_lengths[idx] element_end = 3 + element_len # , <|Spectrum|>, 元素列表 if element_end < len(labels): labels[:element_end] = -100 return { 'encoder1_inputs': formula_vec, # 注意:现在是1D向量 'encoder2_inputs': spec_matrix, 'input_ids': input_ids, 'attention_mask': attention_mask, 'labels': labels, 'formula_labels': formula_vec, # 添加元素计数标签 } # 加载tokenizer tokenizer = AutoTokenizer.from_pretrained('/root/workspace/d21lv5s7v38s73b4ddlg/checkpoint-2500') if tokenizer.mask_token is None: tokenizer.add_special_tokens({"mask_token": "<mask>"}) # 创建数据集 dataset = MolecularDataset('/root/workspace/d21lv5s7v38s73b4ddlg/SELFIES-SFT.csv', tokenizer) def custom_collator(features: List[Dict]) -> Dict: batch = { 'encoder1_inputs': torch.stack([f['encoder1_inputs'] for f in features]), # 形状: (batch_size, 18) 'encoder2_inputs': torch.stack([f['encoder2_inputs'] for f in features]), 'input_ids': torch.stack([f['input_ids'] for f in features]), 'attention_mask': torch.stack([f['attention_mask'] for f in features]), 'labels': torch.stack([f['labels'] for f in features]), 'formula_labels': torch.stack([f['formula_labels'] for f in features]), # 形状: (batch_size, 18) } return batch class ElementPredictionHead(nn.Module): """化学元素计数预测头部""" def __init__(self, hidden_size, output_size=18): super().__init__() self.dense = nn.Linear(hidden_size, hidden_size) self.activation = nn.ReLU() self.layer_norm = nn.LayerNorm(hidden_size) self.out_proj = nn.Linear(hidden_size, output_size) def forward(self, hidden_states): x = self.dense(hidden_states) x = self.activation(x) x = self.layer_norm(x) x = self.out_proj(x) return x class LlamaWithEncoder(PreTrainedModel): def __init__(self, base_model, encoder1_dim=18, encoder2_dim=256, hidden_dim=512): self.config = base_model.config super().__init__(self.config) self.model = base_model # 分子式编码器 encoder1_layer = nn.TransformerEncoderLayer( d_model=encoder1_dim, nhead=3, dim_feedforward=hidden_dim, batch_first=True ) self.encoder1 = nn.TransformerEncoder(encoder1_layer, num_layers=2) # 质谱编码器 encoder2_layer = nn.TransformerEncoderLayer( d_model=encoder2_dim, nhead=8, dim_feedforward=hidden_dim, batch_first=True ) self.encoder2 = nn.TransformerEncoder(encoder2_layer, num_layers=2) # 投影层 self.proj1 = nn.Linear(encoder1_dim, base_model.config.hidden_size) self.proj2 = nn.Linear(encoder2_dim, base_model.config.hidden_size) # 嵌入层 self.embed_tokens = nn.Embedding( num_embeddings=base_model.config.vocab_size, embedding_dim=base_model.config.hidden_size, padding_idx=base_model.config.pad_token_id ) self.embed_tokens.weight.data = base_model.get_input_embeddings().weight.data.clone() # 添加元素计数预测头 self.element_head = ElementPredictionHead(base_model.config.hidden_size) # PEFT所需方法 def get_input_embeddings(self): return self.embed_tokens def set_input_embeddings(self, value): self.embed_tokens = value def get_output_embeddings(self): return self.model.get_output_embeddings() def set_output_embeddings(self, new_embeddings): self.model.set_output_embeddings(new_embeddings) def get_base_model(self): return self.model def forward( self, input_ids: Optional[torch.LongTensor] = None, attention_mask: Optional[torch.FloatTensor] = None, encoder1_inputs: Optional[torch.FloatTensor] = None, encoder2_inputs: Optional[torch.FloatTensor] = None, labels: Optional[torch.LongTensor] = None, formula_labels: Optional[torch.FloatTensor] = None, # 新增:元素计数标签 past_key_values: Optional[Tuple[Tuple[torch.FloatTensor]]] = None, output_attentions: Optional[bool] = None, output_hidden_states: Optional[bool] = None, return_dict: Optional[bool] = None, **kwargs ): return_dict = return_dict if return_dict is not None else self.config.use_return_dict # 1. 编码器处理 enc1_out = self.encoder1(encoder1_inputs.unsqueeze(1)) # 添加序列维度 enc1_out = enc1_out.mean(dim=1) # (batch_size, encoder1_dim) enc1_proj = self.proj1(enc1_out) # (batch_size, hidden_size) enc2_out = self.encoder2(encoder2_inputs) # (batch_size, 501, encoder2_dim) enc2_out = enc2_out.mean(dim=1) # (batch_size, encoder2_dim) enc2_proj = self.proj2(enc2_out) # (batch_size, hidden_size) # 合并编码器输出 mask_replacement = (enc1_proj + enc2_proj) / 2 # (batch_size, hidden_size) # 2. 获取原始嵌入 embeddings = self.embed_tokens(input_ids) # (batch_size, seq_len, hidden_size) batch_size, seq_len, hidden_size = embeddings.size() # 3. 替换<mask> token if seq_len > 2: mask_embed = mask_replacement.unsqueeze(1) # (batch_size, 1, hidden_size) part1 = embeddings[:, :2, :] # (batch_size, 2, hidden_size) part2 = mask_embed # (batch_size, 1, hidden_size) part3 = embeddings[:, 3:, :] # (batch_size, seq_len-3, hidden_size) new_embeddings = torch.cat([part1, part2, part3], dim=1) # (batch_size, seq_len, hidden_size) else: new_embeddings = embeddings # 4. 调用基础模型 model_output = self.model( inputs_embeds=new_embeddings, attention_mask=attention_mask, labels=labels, past_key_values=past_key_values, output_attentions=output_attentions, output_hidden_states=True, # 必须返回隐藏状态用于元素预测 return_dict=return_dict, **kwargs ) # 5. 元素计数预测 element_pred = None element_loss = None if formula_labels is not None: # 获取最后一个非填充token的隐藏状态 seq_lengths = attention_mask.sum(dim=1) - 1 # 最后一个有效token的索引 batch_indices = torch.arange(batch_size, device=model_output.hidden_states[-1].device) last_token_hidden = model_output.hidden_states[-1][batch_indices, seq_lengths] # (batch_size, hidden_size) # 预测元素计数 element_pred = self.element_head(last_token_hidden) # (batch_size, 18) # 计算元素计数损失(MSE损失) element_loss = MSELoss()(element_pred, formula_labels) # 组合总损失:语言模型损失 + 元素计数损失 total_loss = model_output.loss + 0.5 * element_loss else: total_loss = model_output.loss # 返回结果 if not return_dict: output = (model_output.logits,) if element_pred is not None: output += (element_pred,) return (total_loss,) + output if total_loss is not None else output return { 'loss': total_loss, 'logits': model_output.logits, 'element_pred': element_pred, 'element_loss': element_loss, 'hidden_states': model_output.hidden_states, 'past_key_values': model_output.past_key_values, 'attentions': model_output.attentions } # 加载预训练模型 base_model = transformers.AutoModelForCausalLM.from_pretrained( "/root/workspace/d21lv5s7v38s73b4ddlg/checkpoint-2500", trust_remote_code=True, torch_dtype=torch.bfloat16, ) model = LlamaWithEncoder(base_model) # 配置LoRA lora_config = LoraConfig( r=8, lora_alpha=16, target_modules="all-linear", lora_dropout=0.0, bias="none", task_type="CAUSAL_LM" ) model = get_peft_model(model, lora_config) model.print_trainable_parameters() # 训练参数 training_args = TrainingArguments( output_dir="./llama3.2-SELFIES-SFT", per_device_train_batch_size=24, gradient_accumulation_steps=24, num_train_epochs=12, learning_rate=5.0e-05, optim="adamw_torch", logging_steps=10, bf16=True, save_strategy="steps", lr_scheduler_type='cosine', max_grad_norm=1.0, save_steps=2000, warmup_steps=0 ) class CustomTrainer(LoraPlusTrainer): def get_train_dataloader(self) -> DataLoader: return DataLoader( self.train_dataset, batch_size=self.args.train_batch_size, shuffle=True, collate_fn=self.data_collator, drop_last=False, ) # 训练模型 lp_trainer = CustomTrainer( model, training_args, train_dataset=dataset, tokenizer=tokenizer, data_collator=custom_collator, callbacks=[swanlab_callback], ) lp_trainer.train() lp_trainer.save_model(output_dir='./llama3.2-SELFIES-SFT') # 合并LoRA权重并移除元素预测头 model = model.merge_and_unload() model.element_head = None # 移除元素预测头 # 保存模型(不包括元素预测头) save_directory = './llama3.2-SELFIES' model.save_pretrained(save_directory, safe_serialization=True) tokenizer.save_pretrained(save_directory)不对,要对应修改为 element_text = f"<|User|><mask>{element_list}" # SELFIES目标序列并添加标记 selfies_str = self.df.iloc[idx]['SELFIES'] selfies_text = f"<|Assistant|>{selfies_str}",同时化学元素计数预测模型的输入token取<|Assistant|>token之后的,写出完整的修改代码

class HybridFeatureFusion(nn.Module): def init(self, in_channels=None, hidden_dim=256, nhead=8, dim_feedforward=1024, dropout=0.0, enc_act=“gelu”, num_encoder_layers=1): super(HybridFeatureFusion, self).init() # encoder transformer if in_channels is None: in_channels = [256, 512, 1024] self.hidden_dim = hidden_dim self.nhead = nhead self.dim_feedforward = dim_feedforward self.dropout = dropout self.num_encoder_layers = num_encoder_layers self.pe_temperature =10000 encoder_layer = TransformerEncoderLayer( hidden_dim, nhead=nhead, dim_feedforward=dim_feedforward, dropout=dropout, activation=enc_act) # self.input_proj = nn.Sequential( # nn.Conv2d(in_channel, hidden_dim, kernel_size=1, bias=False), # nn.BatchNorm2d(hidden_dim) # ) # channel projection self.input_proj = nn.ModuleList() for in_channel in in_channels: self.input_proj.append( nn.Sequential( nn.Conv2d(in_channel, hidden_dim, kernel_size=1, bias=False), nn.BatchNorm2d(hidden_dim) ) ) self.encoder = TransformerEncoder(copy.deepcopy(encoder_layer), num_encoder_layers) self.cross_attn1 = CrossScaleAttention(256) self.cross_attn2 = CrossScaleAttention(256) self.ca = ChannelAttention(256) self.fusion_norm = nn.ModuleList([ nn.Sequential( nn.BatchNorm2d(256), nn.ReLU(inplace=True)) for _ in range(3)]) # Step 3: 多尺度卷积扩展 self.aspp = ASPP(256, 512) self.final_conv = nn.Sequential( nn.Conv2d(512, 1024, 3, padding=1), nn.BatchNorm2d(1024), nn.ReLU(), nn.Conv2d(1024, 2048, 3, padding=1), nn.BatchNorm2d(2048), nn.ReLU() ) @staticmethod def build_2d_sincos_position_embedding(w, h, embed_dim=256, temperature=10000.): ''' 动态生成位置编码 ''' grid_w = torch.arange(int(w), dtype=torch.float32) grid_h = torch.arange(int(h), dtype=torch.float32) grid_w, grid_h = torch.meshgrid(grid_w, grid_h, indexing='ij') assert embed_dim % 4 == 0, \ 'Embed dimension must be divisible by 4 for 2D sin-cos position embedding' pos_dim = embed_dim // 4 omega = torch.arange(pos_dim, dtype=torch.float32) / pos_dim omega = 1. / (temperature ** omega) out_w = grid_w.flatten()[..., None] @ omega[None] out_h = grid_h.flatten()[..., None] @ omega[None] return torch.concat([out_w.sin(), out_w.cos(), out_h.sin(), out_h.cos()], dim=1)[None, :, :] def forward(self, feats): # 通道映射 1024->256 proj_feats = [self.input_proj[i](feat) for i, feat in enumerate(feats)] # 展平 flatten [B, C, H, W] to [B, HxW, C] h, w = proj_feats[2].shape[2:] src_flatten = proj_feats[2].flatten(2).permute(0, 2, 1) # 获取位置编码 pos_embed = self.build_2d_sincos_position_embedding( w, h, self.hidden_dim, self.pe_temperature).to(src_flatten.device) # 提取全局特征 memory = self.encoder(src_flatten, pos_embed=pos_embed) # 对输出结果进行unflatten,变回原来的大小 proj_feats[2] = memory.permute(0, 2, 1).reshape(-1, self.hidden_dim, h, w).contiguous() feats = proj_feats ### Step 1: 跨尺度注意力交互 feats[1] = self.fusion_norm[1](self.cross_attn2(feats[1], feats[2])) feats[0] = self.fusion_norm[0](self.cross_attn1(feats[0], feats[1])) ### Step 2: 空间对齐与动态加权 feats[0] = F.adaptive_avg_pool2d(feats[0], feats[2].shape[-1] // 2) feats[1] = F.adaptive_avg_pool2d(feats[1], feats[2].shape[-1] // 2) feats[2] = F.adaptive_avg_pool2d(feats[2], feats[2].shape[-1] // 2) combined = self.fusion_norm[2](self.ca(feats[0]) + self.ca(feats[1]) + self.ca(feats[2])) ### Step 3: 多尺度卷积扩展 output = self.final_conv(self.aspp(combined)) return output 我现在正在进行异常检测任务,帮我优化一下这个特征融合模块,降低部分参数量,输出的特征用于还原预训练特征,最终稿用来定位异常位置,给出完整优化代码

import json import torch from typing import Dict, List from torch.utils.data import Dataset import transformers from peft import LoraConfig, TaskType, get_peft_model from torch.utils.data import DataLoader, SequentialSampler from transformers import Trainer, TrainingArguments from lora_plus import LoraPlusTrainer from torch.utils.data import RandomSampler from swanlab.integration.transformers import SwanLabCallback import swanlab import numpy as np import pandas as pd import re from typing import Dict, List import torch from tqdm import tqdm from transformers import PreTrainedTokenizer from transformers import AutoTokenizer import torch.nn as nn swanlab.init("Finetune-Llama3.2-with-Encoder") swanlab_callback = SwanLabCallback( project="Finetune-Llama3.2-with-Encoder", experiment_name="Finetune-Llama3.2-with-Encoder" ) # 常量定义 CHEM_FORMULA_SIZE = "([A-Z][a-z]*)([0-9]*)" VALID_ELEMENTS = ["C", "N", "P", "O", "S", "Si", "I", "H", "Cl", "F", "Br", "B", "Se", "Fe", "Co", "As", "K", "Na"] ELEMENT_VECTORS = np.eye(len(VALID_ELEMENTS)) element_to_position = dict(zip(VALID_ELEMENTS, ELEMENT_VECTORS)) # 化学式转密集向量 def formula_to_dense(chem_formula: str) -> np.ndarray: total_onehot = [] for (chem_symbol, num) in re.findall(CHEM_FORMULA_SIZE, chem_formula): num = 1 if num == "" else int(num) one_hot = element_to_position[chem_symbol].reshape(1, -1) one_hot_repeats = np.repeat(one_hot, repeats=num, axis=0) total_onehot.append(one_hot_repeats) if len(total_onehot) == 0: dense_vec = np.zeros(len(VALID_ELEMENTS)) else: dense_vec = np.vstack(total_onehot).sum(0) return dense_vec # 正弦嵌入 def sine_embed(v, max_count=256): num_freqs = int(np.ceil(np.log2(max_count))) freqs = 0.5 ** torch.arange(num_freqs, dtype=torch.float32) * np.pi v_tensor = torch.tensor(v, dtype=torch.float32)[:, None] embedded = torch.sin(v_tensor * freqs[None, :]) return torch.abs(embedded).numpy() def positional_encoding(max_position, d_model, min_freq=1e-6): position = np.arange(max_position) freqs = min_freq **(2 * (np.arange(d_model) // 2) / d_model) pos_enc = position.reshape(-1, 1) * freqs.reshape(1, -1) pos_enc[:, ::2] = np.cos(pos_enc[:, ::2]) pos_enc[:, 1::2] = np.sin(pos_enc[:, 1::2]) return pos_enc # 生成位置编码 P = positional_encoding(2000000, 256, min_freq=1e2) # 转换为PyTorch张量以便后续使用 P = torch.tensor(P, dtype=torch.float32) dimn = 255 # 质谱数据编码(修复后) def encoding(rag_tensor, P, dimn): to_pad = [] for sample in rag_tensor: # 直接使用列表(因为sample[0]和sample[1]是Python列表) all_dim = [sample[0]] # 移除.tolist(),因为本身就是列表 # 处理位置编码(sample[1]是列表,直接遍历) pos_enc = [P[int(i)-1] for i in sample[1]] for dim_idx in range(dimn): dim_vals = [i[dim_idx].item() for i in pos_enc] all_dim.append(dim_vals) to_pad.append(all_dim) # 使用PyTorch进行序列填充 padded = [] for i in to_pad: # 转换为张量 tensor = torch.tensor(i, dtype=torch.float32) # 计算需要填充的长度 pad_length = max(0, 501 - tensor.size(1)) # 进行后向填充 padded_tensor = torch.nn.functional.pad(tensor, (0, pad_length), mode='constant', value=0) # 如果长度超过501,则截断 if padded_tensor.size(1) > 501: padded_tensor = padded_tensor[:, :501] padded.append(padded_tensor) # 堆叠并交换轴 to_pad = torch.stack(padded) to_pad = to_pad.permute(0, 2, 1) # 相当于numpy的swapaxes(to_pad, 1, -1) return to_pad # 质谱数据预处理(PyTorch实现) def prepro_specs_train(df): df = df.reset_index(drop=True) valid = [] mz_intensity = df['Spectrum'].to_list() def process_line(line): pairs = line.split() mz_list = [] intensity_list = [] for pair in pairs: mz, intensity = pair.split(':') mz_list.append(float(mz)) intensity_list.append(float(intensity)) return mz_list, intensity_list for idx, intensities in tqdm(enumerate(mz_intensity)): mz_list, intensity_list = process_line(intensities) # 添加总精确质量和0强度值 mz_list.append(float(df.at[idx, 'Total Exact Mass'])) intensity_list.append(0.0) # 四舍五入处理 round_mz_list = [round(float(mz), 2) for mz in mz_list] round_intensity_list = [round(float(intensity), 2) for intensity in intensity_list] valid.append([round_mz_list, round_intensity_list]) return valid # 返回列表的列表 # 自定义数据集类 class CSVDataset(torch.utils.data.Dataset): def __init__(self, csv_path, tokenizer: PreTrainedTokenizer, max_selfies_len=512): self.df = pd.read_csv(csv_path) self.tokenizer = tokenizer self.max_selfies_len = max_selfies_len # 预处理质谱数据 spec_df = self.df[['Total Exact Mass', 'Spectrum']].copy() self.rag_tensor = prepro_specs_train(spec_df) self.spec_encoded = encoding(self.rag_tensor, P, dimn) def __len__(self): return len(self.df) def __getitem__(self, idx) -> Dict[str, torch.Tensor]: # 1. 处理分子式 formula = self.df.iloc[idx]['Molecular Formula'] formula_vec = formula_to_dense(formula) # 形状: (18,) # 2. 处理质谱数据 spec_matrix = self.spec_encoded[idx] # 形状: (501, 257) # 3. 处理SELFIES - 添加attention_mask selfies_str = self.df.iloc[idx]['SELFIES'] # 编码时同时获取input_ids和attention_mask encoding_result = self.tokenizer.encode_plus( selfies_str, add_special_tokens=True, # 添加[CLS]和[SEP] max_length=self.max_selfies_len, padding='max_length', truncation=True, return_attention_mask=True, return_tensors='pt' ) input_ids = encoding_result['input_ids'].squeeze(0) attention_mask = encoding_result['attention_mask'].squeeze(0) return { 'formula_vec': torch.tensor(formula_vec, dtype=torch.float32), 'spec_matrix': spec_matrix, # 已为tensor,无需重复转换 'selfies_ids': input_ids, 'attention_mask': attention_mask } # 初始化tokenizer tokenizer = AutoTokenizer.from_pretrained('/root/workspace/checkpoint-2500') # 创建数据集 dataset = CSVDataset('/root/workspace/SELFIES-SFT.csv', tokenizer) data_collator = transformers.DataCollatorForSeq2Seq( tokenizer=tokenizer) # 定义带额外Encoder的自定义模型 class LlamaWithEncoder(nn.Module): def __init__(self, base_model, encoder1_dim=18, encoder2_dim=256, hidden_dim=512): super().__init__() self.base_model = base_model # 第一个Transformer Encoder encoder1_layer = nn.TransformerEncoderLayer( d_model=encoder1_dim, nhead=3, dim_feedforward=hidden_dim, batch_first=True ) self.encoder1 = nn.TransformerEncoder(encoder1_layer, num_layers=2) # 第二个Transformer Encoder encoder2_layer = nn.TransformerEncoderLayer( d_model=encoder2_dim, nhead=8, dim_feedforward=hidden_dim, batch_first=True ) self.encoder2 = nn.TransformerEncoder(encoder2_layer, num_layers=2) # 投影层 self.proj1 = nn.Linear(encoder1_dim, base_model.config.hidden_size) self.proj2 = nn.Linear(encoder2_dim, base_model.config.hidden_size) # 融合层 self.fusion = nn.Linear(2 * base_model.config.hidden_size, base_model.config.hidden_size) def prepare_inputs_for_generation(self, input_ids, past_key_values=None, **kwargs): return self.base_model.prepare_inputs_for_generation( input_ids, past_key_values=past_key_values, **kwargs ) def forward( self, input_ids=None, attention_mask=None, encoder1_inputs=None, encoder2_inputs=None, labels=None, past_key_values=None, output_attentions=None, output_hidden_states=None, return_dict=None, **kwargs ): # 处理编码器输入 enc1_out = self.encoder1(encoder1_inputs) enc1_out = enc1_out.mean(dim=1) enc1_proj = self.proj1(enc1_out) enc2_out = self.encoder2(encoder2_inputs) enc2_out = enc2_out.mean(dim=1) enc2_proj = self.proj2(enc2_out) # 融合编码器输出 fused = self.fusion(torch.cat([enc1_proj, enc2_proj], dim=1)) fused = fused.unsqueeze(1) # 获取嵌入层输出 embeddings = self.base_model.get_input_embeddings()(input_ids) # 将融合结果与第一个token的嵌入结合 if embeddings.size(1) > 0: embeddings[:, 0, :] = (embeddings[:, 0, :] + fused[:, 0, :]) / 2 # 使用修改后的嵌入调用基础模型 return self.base_model( inputs_embeds=embeddings, attention_mask=attention_mask, labels=labels, past_key_values=past_key_values, output_attentions=output_attentions, output_hidden_states=output_hidden_states, return_dict=return_dict, **kwargs ) # 加载预训练模型 base_model = transformers.AutoModelForCausalLM.from_pretrained( "/root/workspace/checkpoint-2500", trust_remote_code=True, torch_dtype=torch.bfloat16, ) model = LlamaWithEncoder(base_model) lora_config = LoraConfig( r=8, lora_alpha=16, target_modules="all-linear", # 目标注意力层 lora_dropout=0.0, bias="none", task_type="CAUSAL_LM" ) model = get_peft_model(model, lora_config) model.print_trainable_parameters() # 输出示例:0.3% 参数可训练 training_args = TrainingArguments( output_dir="./llama3.2-SELFIES-SFT", per_device_train_batch_size=16, gradient_accumulation_steps=16, num_train_epochs=10, learning_rate=5.0e-05, optim="adamw_torch", logging_steps=10, bf16=True, save_strategy="steps", lr_scheduler_type='cosine', max_grad_norm=1.0, save_steps=2000, warmup_steps=0 ) class CustomTrainer(LoraPlusTrainer): def get_train_dataloader(self) -> DataLoader: """ Returns the training dataloader using a random sampler to shuffle the dataset. """ return DataLoader( self.train_dataset, batch_size=self.args.train_batch_size, shuffle=True, collate_fn=self.data_collator, drop_last=False, ) # 使用修改后的 CustomTrainer lp_trainer = CustomTrainer( model, training_args, train_dataset=dataset, tokenizer=tokenizer, data_collator=data_collator, callbacks=[swanlab_callback], ) lp_trainer.train() lp_trainer.save_model(output_dir='./llama3.2-SELFIES-SFT') 修改代码,确保添加的Encoder可以顺利进行lora微调

使用transfomers库实现encoder-decoder架构的,encoder和decoder都是transformerxl的,使用旋转位置编码的示例代码,旋转编码实现代码如下import torch class RotaryEmbedding(torch.nn.Module): def __init__(self, dim, base=10000): super().__init__() inv_freq = 1. / (base ** (torch.arange(0, dim, 2).float() / dim)) self.register_buffer('inv_freq', inv_freq) self.seq_len_cached = 0 self.cos_cached = None self.sin_cached = None def forward(self, x, seq_dim=1): seq_len = x.shape[seq_dim] if seq_len != self.seq_len_cached: #if seq_len > self.seq_len_cached: self.seq_len_cached = seq_len t = torch.arange(x.shape[seq_dim], device=x.device).type_as(self.inv_freq) freqs = torch.einsum('i,j->ij', t, self.inv_freq) emb = torch.cat((freqs, freqs), dim=-1).to(x.device) self.cos_cached = emb.cos()[None,:, None, :] self.sin_cached = emb.sin()[None,:, None, :] #else: # cos_return = self.cos_cached[..., :seq_len] # sin_return = self.sin_cached[..., :seq_len] # return cos_return, sin_return return self.cos_cached, self.sin_cached # rotary pos emb helpers: def rotate_half(x): x1, x2 = x[..., :x.shape[-1] // 2], x[..., x.shape[-1] // 2:] return torch.cat((-x2, x1), dim=x1.ndim - 1) # dim=-1 triggers a bug in earlier torch versions @torch.jit.script def apply_rotary_pos_emb(q, k, cos, sin): return (q * cos) + (rotate_half(q) * sin), (k * cos) + (rotate_half(k) * sin)from torch.nn import Linear, Module from fast_transformers.attention import AttentionLayer from fast_transformers.events import EventDispatcher, QKVEvent from .rotary import RotaryEmbedding, apply_rotary_pos_emb class RotateAttentionLayer(AttentionLayer): """Rotate attention layer inherits from fast_transformer attention layer. The only thing added is an Embedding encoding, for more information on the attention layer see the fast_transformers code """ def __init__(self, attention, d_model, n_heads, d_keys=None, d_values=None, event_dispatcher=""): super(RotateAttentionLayer, self).__init__(attention,d_model, n_heads, d_keys=d_keys, d_values=d_values, event_dispatcher=event_dispatcher) self.rotaryemb = RotaryEmbedding(d_keys) print('Using Rotation Embedding') def forward(self, queries, keys, values, attn_mask, query_lengths, key_lengths): """ Using the same frame work as the fast_Transformers attention layer but injecting rotary information to the queries and the keys after the keys and queries are projected. In the argument description we make use of the following sizes - N: the batch size - L: The maximum length of the queries - S: The maximum length of the keys (the actual length per sequence is given by the length mask) - D: The input feature dimensionality passed in the constructor as 'd_model' Arguments --------- queries: (N, L, D) The tensor containing the queries keys: (N, S, D) The tensor containing the keys values: (N, S, D) The tensor containing the values attn_mask: An implementation of BaseMask that encodes where each query can attend to query_lengths: An implementation of BaseMask that encodes how many queries each sequence in the batch consists of key_lengths: An implementation of BaseMask that encodes how many queries each sequence in the batch consists of Returns ------- The new value for each query as a tensor of shape (N, L, D). """ # Extract the dimensions into local variables N, L, _ = queries.shape _, S, _ = keys.shape H = self.n_heads # Project the queries/keys/values queries = self.query_projection(queries).view(N, L, H, -1) keys = self.key_projection(keys).view(N, S, H, -1) cos, sin = self.rotaryemb(queries) queries, keys = apply_rotary_pos_emb(queries, keys, cos, sin) values = self.value_projection(values).view(N, S, H, -1) # Let the world know of the qkv self.event_dispatcher.dispatch(QKVEvent(self, queries, keys, values)) # Compute the attention new_values = self.inner_attention( queries, keys, values, attn_mask, query_lengths, key_lengths ).view(N, L, -1) # Project the output and return return self.out_projection(new_values)

import os os.environ['KMP_DUPLICATE_LIB_OK'] = 'True' import torch import torch.nn as nn import torch.optim as optim import torch.utils.data as Data import numpy as np import pandas as pd from sklearn.preprocessing import StandardScaler from sklearn.model_selection import train_test_split import matplotlib.pyplot as plt from joblib import dump, load from time import time from mealpy.swarm_based import PSO from mealpy.evolutionary_based import GA from sko.SA import SA as SKO_SA from mealpy.swarm_based import ACOR from mealpy.swarm_based import WOA from mealpy.swarm_based import GWO # ==================== 1. 设备设置与随机种子 ==================== torch.manual_seed(100) device = torch.device("cuda" if torch.cuda.is_available() else "cpu") # ==================== 2. 数据加载与预处理 ==================== data = pd.read_csv('D:/PycharmProjects/PythonProject3/liaotou4.csv') feature_columns = data.columns[1:-2] target_columns = data.columns[-2:] # 可多目标 X_all, y_all = data[feature_columns].values, data[target_columns].values # 特征归一化 scaler_x = StandardScaler() X_all = scaler_x.fit_transform(X_all) dump(scaler_x, 'scaler_x') # 目标归一化 scaler_y = StandardScaler() y_all = scaler_y.fit_transform(y_all) dump(scaler_y, 'scaler_y') # 构建序列数据 seq_len = 60 X_seq, y_seq = [], [] for i in range(len(X_all) - seq_len): X_seq.append(X_all[i:i+seq_len]) y_seq.append(y_all[i+seq_len]) X_seq = torch.tensor(np.array(X_seq), dtype=torch.float32) y_seq = torch.tensor(np.array(y_seq), dtype=torch.float32) # 数据集划分 # 数据集按时序划分 train_size = int(0.7 * len(X_seq)) train_x, test_x = X_seq[:train_size], X_seq[train_size:] train_y, test_y = y_seq[:train_size], y_seq[train_size:] batch_size = 64 train_loader = Data.DataLoader(Data.TensorDataset(train_x, train_y), batch_size=batch_size, shuffle=True, drop_last=True) test_loader = Data.DataLoader(Data.TensorDataset(test_x, test_y), batch_size=batch_size, drop_last=True) # ==================== 3. 定义模型 ==================== from torch.nn import TransformerEncoder, TransformerEncoderLayer class TransformerBiLSTM(nn.Module): def __init__(self, input_dim, hidden_layer_sizes, hidden_dim, num_layers, num_heads, output_dim, dropout_rate=0.5): super().__init__() self.transformer = TransformerEncoder( TransformerEncoderLayer(input_dim, num_heads, hidden_dim, dropout=dropout_rate, batch_first=True), num_layers ) self.num_layers = len(hidden_layer_sizes) self.bilstm_layers = nn.ModuleList() self.bilstm_layers.append(nn.LSTM(input_dim, hidden_layer_sizes[0], batch_first=True, bidirectional=True)) for i in range(1, self.num_layers): self.bilstm_layers.append(nn.LSTM(hidden_layer_sizes[i-1]*2, hidden_layer_sizes[i], batch_first=True, bidirectional=True)) self.linear = nn.Linear(hidden_layer_sizes[-1]*2, output_dim) def forward(self, input_seq): transformer_output = self.transformer(input_seq) bilstm_out = transformer_output for bilstm in self.bilstm_layers: bilstm_out, _ = bilstm(bilstm_out) predict = self.linear(bilstm_out[:, -1, :]) return predict # ==================== 4. VPPSO算法实现 ==================== def vppso( func, dim, bounds, N=12, N1=6, N2=6, T=16, a=0.3, c1=2.0, c2=2.0, b=1.0, verbose=True ): X = np.random.uniform([b[0] for b in bounds], [b[1] for b in bounds], (N, dim)) V = np.zeros((N, dim)) Pbest = X.copy() Pbest_f = np.array([func(x) for x in X]) gbest_idx = np.argmin(Pbest_f) Gbest = Pbest[gbest_idx].copy() Gbest_f = Pbest_f[gbest_idx] best_curve = [Gbest_f] for t in range(T): alpha_t = np.exp(-b * (t / T) ** b) for i in range(N): if i < N1: if np.random.rand() < a: V[i] = V[i] else: r1, r2, r3 = np.random.rand(3) V[i] = (V[i] * r1 * alpha_t + c1 * r2 * (Pbest[i] - X[i]) + c2 * r3 * (Gbest - X[i])) X[i] = X[i] + V[i] else: if np.random.rand() < 0.5: X[i] = Gbest + alpha_t * np.random.rand(dim) * np.abs(Gbest * alpha_t) else: X[i] = Gbest - alpha_t * np.random.rand(dim) * np.abs(Gbest * alpha_t) # 边界处理 for d in range(dim): if X[i, d] < bounds[d][0]: X[i, d] = bounds[d][0] if X[i, d] > bounds[d][1]: X[i, d] = bounds[d][1] # 适应度 F = np.array([func(x) for x in X]) for i in range(N): if i < N1: if F[i] < Pbest_f[i]: Pbest[i] = X[i].copy() Pbest_f[i] = F[i] if F[i] < Gbest_f: Gbest = X[i].copy() Gbest_f = F[i] else: if F[i] < Gbest_f: Gbest = X[i].copy() Gbest_f = F[i] best_curve.append(Gbest_f) if verbose and (t % 4 == 0 or t == T-1): print(f"Iter {t+1}/{T}, Best fitness: {Gbest_f}") return Gbest, Gbest_f, best_curve # ==================== 5. 定义超参数搜索空间与适应度函数 ==================== param_bounds = [ (32, 128), # hidden_layer_sizes[0] (32, 128), # hidden_layer_sizes[1] (64, 256), # hidden_dim (1, 4), # num_layers (1, 4), # num_heads (0.05, 0.5), # dropout_rate (0.00005, 0.005) # learning rate ] def eval_model_hyperparams(x): h1 = int(round(x[0])) h2 = int(round(x[1])) hidden_dim = int(round(x[2])) num_layers = int(round(x[3])) num_heads = int(round(x[4])) dropout = float(x[5]) lr = float(x[6]) try: model = TransformerBiLSTM( input_dim=X_seq.shape[2], hidden_layer_sizes=[h1, h2], hidden_dim=hidden_dim, num_layers=num_layers, num_heads=num_heads, output_dim=y_seq.shape[1], dropout_rate=dropout ).to(device) optimizer = optim.Adam(model.parameters(), lr) loss_function = nn.MSELoss(reduction='sum') best_mse = 1000. for epoch in range(4): # 搜索时只训练4个epoch model.train() for seq, labels in train_loader: seq, labels = seq.to(device), labels.to(device) optimizer.zero_grad() y_pred = model(seq) loss = loss_function(y_pred, labels) loss.backward() optimizer.step() model.eval() with torch.no_grad(): test_loss = 0. test_size = len(test_loader.dataset) for data, label in test_loader: data, label = data.to(device), label.to(device) pre = model(data) test_loss += loss_function(pre, label).item() test_av_mseloss = test_loss / test_size if test_av_mseloss < best_mse: best_mse = test_av_mseloss return best_mse except Exception as e: print("Exception in eval:", e) return 9999. def run_sa(): bounds = np.array(param_bounds) x0 = [(b[0] + b[1]) / 2 for b in param_bounds] sa = SKO_SA( func=lambda x: eval_model_hyperparams(np.clip(x, bounds[:, 0], bounds[:, 1])), x0=x0, T_max=50, T_min=1, L=30, max_stay_counter=20 ) best_param, best_loss = sa.run() return best_param, best_loss from mealpy import Problem problem = Problem( fit_func=eval_model_hyperparams, bounds=param_bounds, minmax="min" ) optimizer_dict = { 'GA': lambda: GA.BaseGA(problem, epoch=16, pop_size=12).solve().solution[:2], 'PSO': lambda: PSO.BasePSO(problem, epoch=16, pop_size=12).solve().solution[:2], 'ACO': lambda: ACOR.BaseACOR(problem, epoch=16, pop_size=12).solve().solution[:2], 'WOA': lambda: WOA.BaseWOA(problem, epoch=16, pop_size=12).solve().solution[:2], 'GWO': lambda: GWO.BaseGWO(problem, epoch=16, pop_size=12).solve().solution[:2], 'SA': run_sa, 'VPPSO': lambda: vppso(eval_model_hyperparams, len(param_bounds), param_bounds, N=12, N1=6, N2=6, T=16, a=0.3, c1=2.0, c2=2.0, b=1.0, verbose=False)[:2] } final_results = {} show_n = 100 # 展示前100个样本 alg_colors = { 'VPPSO': 'blue', 'GA': 'red', 'PSO': 'green', 'SA': 'purple', 'ACOR': 'orange', 'WOA': 'deepskyblue', 'GWO': 'brown' } for alg_name, alg_func in optimizer_dict.items(): print(f"\n------ 开始{alg_name}优化Transformer-BiLSTM超参数 ------") best_param, best_loss = alg_func() # 还原参数 h1 = int(round(best_param[0])) h2 = int(round(best_param[1])) hidden_dim = int(round(best_param[2])) num_layers = int(round(best_param[3])) num_heads = int(round(best_param[4])) dropout = float(best_param[5]) lr = float(best_param[6]) print(f'{alg_name}最优超参数: {best_param}, 验证loss: {best_loss}') # 训练模型 model = TransformerBiLSTM( input_dim=X_seq.shape[2], hidden_layer_sizes=[h1, h2], hidden_dim=hidden_dim, num_layers=num_layers, num_heads=num_heads, output_dim=y_seq.shape[1], dropout_rate=dropout ).to(device) optimizer = optim.Adam(model.parameters(), lr) loss_function = nn.MSELoss(reduction='sum') # 为加快总流程,只训练epochs=40,你可调整 def train_short(model, epochs): train_size = len(train_loader.dataset) test_size = len(test_loader.dataset) minimum_mse = 1000. best_model_wts = model.state_dict() for epoch in range(epochs): model.train() for seq, labels in train_loader: seq, labels = seq.to(device), labels.to(device) optimizer.zero_grad() y_pred = model(seq) loss = loss_function(y_pred, labels) loss.backward() optimizer.step() model.eval() with torch.no_grad(): test_loss = 0. for data, label in test_loader: data, label = data.to(device), label.to(device) pre = model(data) test_loss += loss_function(pre, label).item() test_av_mseloss = test_loss / test_size if test_av_mseloss < minimum_mse: minimum_mse = test_av_mseloss best_model_wts = model.state_dict() model.load_state_dict(best_model_wts) train_short(model, epochs=40) torch.save(model.state_dict(), f'best_model_{alg_name}.pt') # 预测前100个 model.eval() original_data = [] pre_data = [] with torch.no_grad(): for data, label in test_loader: data, label = data.to(device), label.to(device) y_pred = model(data) original_data.append(label.cpu().numpy()) pre_data.append(y_pred.cpu().numpy()) original_data = np.concatenate(original_data, axis=0) pre_data = np.concatenate(pre_data, axis=0) scaler_y = load('scaler_y') original_100 = scaler_y.inverse_transform(original_data)[:show_n, 0] pre_100 = scaler_y.inverse_transform(pre_data)[:show_n, 0] final_results[alg_name] = (original_100, pre_100) # ======================= 结果可视化对比 ====================== plt.figure(figsize=(14, 7)) plt.plot(final_results['VPPSO'][0], color='gray', label='真实值', linewidth=2, linestyle='--') for alg_name, (orig, pred) in final_results.items(): plt.plot(pred, color=alg_colors[alg_name], label=f'{alg_name}优化', alpha=0.85) plt.xlabel('样本编号') plt.ylabel('预测输出') plt.title('不同智能优化算法下Transformer-BiLSTM预测对比(前100样本)') plt.legend() plt.tight_layout() plt.show()

import json import torch from typing import Dict, List from torch.utils.data import Dataset import transformers from peft import LoraConfig, TaskType, get_peft_model from torch.utils.data import DataLoader, SequentialSampler from transformers import Trainer, TrainingArguments from lora_plus import LoraPlusTrainer from torch.utils.data import RandomSampler from swanlab.integration.transformers import SwanLabCallback import swanlab import numpy as np import pandas as pd import re from typing import Dict, List import torch from tqdm import tqdm from transformers import PreTrainedTokenizer from transformers import AutoTokenizer import torch.nn as nn from lora_plus import LoraPlusTrainer # 确保已安装lora_plus库 from transformers import PreTrainedModel # 初始化SwanLab swanlab.init("Finetune-Llama3.2-with-Encoder") swanlab_callback = SwanLabCallback( project="Finetune-Llama3.2-with-Encoder", experiment_name="Finetune-Llama3.2-with-Encoder" ) # 常量定义 CHEM_FORMULA_SIZE = r"([A-Z][a-z]*)([0-9]*)" VALID_ELEMENTS = ["C", "N", "P", "O", "S", "Si", "I", "H", "Cl", "F", "Br", "B", "Se", "Fe", "Co", "As", "K", "Na"] element_to_idx = {elem: idx for idx, elem in enumerate(VALID_ELEMENTS)} # 化学式转密集向量 def formula_to_dense(chem_formula: str) -> torch.Tensor: dense_vec = torch.zeros(len(VALID_ELEMENTS), dtype=torch.float32) matches = re.findall(CHEM_FORMULA_SIZE, chem_formula) for chem_symbol, num_str in matches: num = 1 if num_str == "" else int(num_str) if chem_symbol in element_to_idx: idx = element_to_idx[chem_symbol] dense_vec[idx] += num return dense_vec # 位置编码生成 (PyTorch实现) def positional_encoding(max_position: int, d_model: int, min_freq: float = 1e-4) -> torch.Tensor: position = torch.arange(max_position).unsqueeze(1) div_term = torch.exp(torch.arange(0, d_model, 2) * (-torch.log(torch.tensor(min_freq)) / d_model)) pos_enc = torch.zeros(max_position, d_model) pos_enc[:, 0::2] = torch.sin(position * div_term) pos_enc[:, 1::2] = torch.cos(position * div_term) return pos_enc # 初始化位置编码矩阵 P = positional_encoding(2000000, 254) dimn = 254 # 与位置编码维度一致 # 质谱数据编码 def encode_spectra(rag_tensor: list, P: torch.Tensor, dimn: int) -> torch.Tensor: encoded_list = [] for sample in rag_tensor: mz_list, intensity_list = sample # 创建基础特征矩阵 [m/z, intensity] base_features = torch.tensor([mz_list, intensity_list], dtype=torch.float32).T # 添加位置编码特征 pos_enc = torch.stack([P[min(int(mz), P.size(0)-1)] for mz in mz_list]) # 组合所有特征 [m/z, intensity, pos_enc...] features = torch.cat([base_features, pos_enc], dim=1) # 填充/截断到固定长度 if features.size(0) < 501: padding = torch.zeros(501 - features.size(0), features.size(1)) features = torch.cat([features, padding], dim=0) else: features = features[:501] encoded_list.append(features) return torch.stack(encoded_list) # 质谱数据预处理 def preprocess_spectra(df: pd.DataFrame) -> list: spectra_list = [] for idx, row in tqdm(df.iterrows(), total=len(df)): spectrum_str = row['Spectrum'] total_mass = row['Total Exact Mass'] # 解析质谱字符串 pairs = spectrum_str.split() mz_list, intensity_list = [], [] for pair in pairs: mz, intensity = pair.split(':') mz_list.append(float(mz)) intensity_list.append(float(intensity)) # 添加总精确质量 mz_list.append(total_mass) intensity_list.append(0.0) # 四舍五入处理 mz_list = [round(mz, 2) for mz in mz_list] intensity_list = [round(intensity, 2) for intensity in intensity_list] spectra_list.append([mz_list, intensity_list]) return spectra_list class MolecularDataset(Dataset): def __init__(self, csv_path: str, tokenizer: AutoTokenizer, max_seq_len: int = 512): self.df = pd.read_csv(csv_path) self.tokenizer = tokenizer self.max_seq_len = max_seq_len self.pad_token_id = tokenizer.pad_token_id # 预处理质谱数据 spectra_data = preprocess_spectra(self.df) self.spec_encoded = encode_spectra(spectra_data, P, dimn) def __len__(self): return len(self.df) def __getitem__(self, idx) -> dict: # 分子式向量和质谱矩阵(保持不变) formula = self.df.iloc[idx]['Molecular Formula'] formula_vec = formula_to_dense(formula).unsqueeze(0) spec_matrix = self.spec_encoded[idx] # SELFIES目标序列 selfies_str = self.df.iloc[idx]['SELFIES'] encoding = self.tokenizer( selfies_str, add_special_tokens=True, # 包含 padding='max_length', truncation=True, max_length=self.max_seq_len, return_tensors='pt' ) # 输入序列仅包含开始符号 input_ids = encoding['input_ids'].squeeze(0) attention_mask = encoding['attention_mask'].squeeze(0) # 标签为完整的目标序列(替换padding为-100) labels = input_ids.clone() labels[labels == self.pad_token_id] = -100 return { 'encoder1_inputs': formula_vec, 'encoder2_inputs': spec_matrix, 'input_ids': input_ids, 'attention_mask': attention_mask, 'labels': labels } # 加载tokenizer tokenizer = AutoTokenizer.from_pretrained('/root/workspace/d21lv5s7v38s73b4ddlg/checkpoint-2500') # 创建数据集 dataset = MolecularDataset('/root/workspace/d21lv5s7v38s73b4ddlg/SELFIES-SFT.csv', tokenizer) def custom_collator(features: List[Dict]) -> Dict: batch = { 'encoder1_inputs': torch.stack([f['encoder1_inputs'] for f in features]), # 形状:(batch_size, 1, 18) 'encoder2_inputs': torch.stack([f['encoder2_inputs'] for f in features]), # 形状:(batch_size, 501, 258) 'input_ids': torch.stack([f['input_ids'] for f in features]), 'attention_mask': torch.stack([f['attention_mask'] for f in features]), 'labels': torch.stack([f['labels'] for f in features]), } return batch class LlamaWithEncoder(PreTrainedModel): def __init__(self, base_model, encoder1_dim=18, encoder2_dim=256, hidden_dim=512): # 添加config属性,这是PreTrainedModel要求的 self.config = base_model.config super().__init__(self.config) # 将基础模型赋值给model属性,PEFT库会寻找这个属性 self.model = base_model # 关键修改:添加model属性 # 第一个Transformer Encoder encoder1_layer = nn.TransformerEncoderLayer( d_model=encoder1_dim, nhead=3, # 18能被3整除 dim_feedforward=hidden_dim, batch_first=True ) self.encoder1 = nn.TransformerEncoder(encoder1_layer, num_layers=2) # 第二个Transformer Encoder(适配256维输入) encoder2_layer = nn.TransformerEncoderLayer( d_model=encoder2_dim, nhead=8, # 256能被8整除 dim_feedforward=hidden_dim, batch_first=True ) self.encoder2 = nn.TransformerEncoder(encoder2_layer, num_layers=2) # 投影层 self.proj1 = nn.Linear(encoder1_dim, base_model.config.hidden_size) self.proj2 = nn.Linear(encoder2_dim, base_model.config.hidden_size) # 融合层 self.fusion = nn.Linear(2 * base_model.config.hidden_size, base_model.config.hidden_size) # 添加这个方法,帮助PEFT正确识别基础模型 def get_base_model(self): return self.model # 重写这个方法,确保模块结构正确 def _get_name(self): return "LlamaWithEncoder" def prepare_inputs_for_generation(self, input_ids, past_key_values=None, **kwargs): return self.base_model.prepare_inputs_for_generation( input_ids, past_key_values=past_key_values,** kwargs ) def forward( self, input_ids=None, # 仅用于接收输入,但调用基础模型时不传递 attention_mask=None, encoder1_inputs=None, encoder2_inputs=None, labels=None, past_key_values=None, output_attentions=None, output_hidden_states=None, return_dict=None, **kwargs # 可能包含冗余参数,需过滤 ): # 处理编码器输入 enc1_out = self.encoder1(encoder1_inputs) # (batch_size, 1, 18) enc1_out = enc1_out.mean(dim=1) # (batch_size, 18) enc1_proj = self.proj1(enc1_out) # (batch_size, hidden_size) enc2_out = self.encoder2(encoder2_inputs) # (batch_size, 501, 256) enc2_out = enc2_out.mean(dim=1) # (batch_size, 256) enc2_proj = self.proj2(enc2_out) # (batch_size, hidden_size) # 融合编码器输出 fused = self.fusion(torch.cat([enc1_proj, enc2_proj], dim=1)) # (batch_size, hidden_size) fused = fused.unsqueeze(1) # (batch_size, 1, hidden_size) # 获取嵌入层输出(基于输入的input_ids,但后续会被融合结果修改) embeddings = self.base_model.get_input_embeddings()(input_ids) # (batch_size, seq_len, hidden_size) # 将融合结果与第一个token的嵌入结合 if embeddings.size(1) > 0: embeddings[:, 0, :] = (embeddings[:, 0, :] + fused[:, 0, :]) / 2 # 调用基础模型时,仅传递必要参数,排除input_ids(与inputs_embeds互斥) return self.base_model( inputs_embeds=embeddings, # 传递修改后的嵌入,替代input_ids attention_mask=attention_mask, labels=labels, past_key_values=past_key_values, output_attentions=output_attentions, output_hidden_states=output_hidden_states, return_dict=return_dict, # 移除**kwargs,避免传递冗余参数(如input_ids)导致冲突 ) # 加载预训练模型 base_model = transformers.AutoModelForCausalLM.from_pretrained( "/root/workspace/d21lv5s7v38s73b4ddlg/checkpoint-2500", trust_remote_code=True, torch_dtype=torch.bfloat16, ) model = LlamaWithEncoder(base_model) lora_config = LoraConfig( r=8, lora_alpha=16, target_modules="all-linear", # 目标注意力层 lora_dropout=0.0, bias="none", task_type="CAUSAL_LM" ) model = get_peft_model(model, lora_config) model.print_trainable_parameters() # 输出示例:0.3% 参数可训练 training_args = TrainingArguments( output_dir="./llama3.2-SELFIES-SFT", per_device_train_batch_size=24, gradient_accumulation_steps=24, num_train_epochs=1, learning_rate=5.0e-05, optim="adamw_torch", logging_steps=10, bf16=True, save_strategy="steps", lr_scheduler_type='cosine', max_grad_norm=1.0, save_steps=2000, warmup_steps=0 ) class CustomTrainer(LoraPlusTrainer): def get_train_dataloader(self) -> DataLoader: """ Returns the training dataloader using a random sampler to shuffle the dataset. """ return DataLoader( self.train_dataset, batch_size=self.args.train_batch_size, shuffle=True, collate_fn=self.data_collator, drop_last=False, ) # 使用修改后的 CustomTrainer lp_trainer = CustomTrainer( model, training_args, train_dataset=dataset, tokenizer=tokenizer, data_collator=custom_collator, callbacks=[swanlab_callback], ) lp_trainer.train() lp_trainer.save_model(output_dir='./llama3.2-SELFIES-SFT') # 合并LoRA权重 model = model.merge_and_unload() # 保存整个模型(包括自定义编码器和融合层)为safetensors格式 save_directory = './llama3.2-SELFIES' model.save_pretrained(save_directory, safe_serialization=True) # 同时保存tokenizer tokenizer.save_pretrained(save_directory)解决报错 File "/root/workspace/d21lv5s7v38s73b4ddlg/encoder-sft.py", line 338, in <module> lp_trainer.train() File "/opt/conda/lib/python3.10/site-packages/transformers/trainer.py", line 2241, in train return inner_training_loop( File "/opt/conda/lib/python3.10/site-packages/transformers/trainer.py", line 2548, in _inner_training_loop tr_loss_step = self.training_step(model, inputs, num_items_in_batch) File "/opt/conda/lib/python3.10/site-packages/transformers/trainer.py", line 3698, in training_step loss = self.compute_loss(model, inputs, num_items_in_batch=num_items_in_batch) File "/opt/conda/lib/python3.10/site-packages/transformers/trainer.py", line 3759, in compute_loss outputs = model(**inputs) File "/opt/conda/lib/python3.10/site-packages/torch/nn/modules/module.py", line 1518, in _wrapped_call_impl return self._call_impl(*args, **kwargs) File "/opt/conda/lib/python3.10/site-packages/torch/nn/modules/module.py", line 1527, in _call_impl return forward_call(*args, **kwargs) File "/opt/conda/lib/python3.10/site-packages/deepspeed/utils/nvtx.py", line 20, in wrapped_fn ret_val = func(*args, **kwargs) File "/opt/conda/lib/python3.10/site-packages/deepspeed/runtime/engine.py", line 2054, in forward loss = self.module(*inputs, **kwargs) File "/opt/conda/lib/python3.10/site-packages/torch/nn/modules/module.py", line 1518, in _wrapped_call_impl return self._call_impl(*args, **kwargs) File "/opt/conda/lib/python3.10/site-packages/torch/nn/modules/module.py", line 1568, in _call_impl result = forward_call(*args, **kwargs) File "/opt/conda/lib/python3.10/site-packages/peft/peft_model.py", line 1845, in forward return self.base_model( File "/opt/conda/lib/python3.10/site-packages/torch/nn/modules/module.py", line 1518, in _wrapped_call_impl return self._call_impl(*args, **kwargs) File "/opt/conda/lib/python3.10/site-packages/torch/nn/modules/module.py", line 1527, in _call_impl return forward_call(*args, **kwargs) File "/opt/conda/lib/python3.10/site-packages/peft/tuners/tuners_utils.py", line 216, in forward return self.model.forward(*args, **kwargs) File "/root/workspace/d21lv5s7v38s73b4ddlg/encoder-sft.py", line 260, in forward embeddings = self.base_model.get_input_embeddings()(input_ids) # (batch_size, seq_len, hidden_size) File "/opt/conda/lib/python3.10/site-packages/transformers/modeling_utils.py", line 1865, in get_input_embeddings raise NotImplementedError

import pandas as pd import numpy as np import torch import torch.nn as nn import torch.nn.functional as F from torch.utils.data import Dataset, DataLoader from sklearn.model_selection import train_test_split from tensorflow.keras.preprocessing.text import Tokenizer from tensorflow.keras.preprocessing.sequence import pad_sequences import re import matplotlib.pyplot as plt from tqdm import tqdm from sklearn.metrics import f1_score, precision_score, recall_score # 新增评估指标 # 设备配置(修正引号) device = torch.device('cuda' if torch.cuda.is_available() else 'cpu') print(f"Using device: {device}") # 数据预处理(保持不变) def load_and_preprocess_data(): comments = pd.read_csv(r'D:\BaiduNetdiskDownload\电影数据集-CSV格式\comments.csv') if 'RATING' not in comments.columns: raise KeyError("RATING column not found") comments['CONTENT'] = comments['CONTENT'].fillna('').astype(str) comments['CLEAN_CONTENT'] = comments['CONTENT'].apply( lambda x: re.sub(r'[^\w\s]', '', x.lower()) ) comments['LABEL'] = comments['RATING'] - 1 valid_labels = comments['LABEL'].between(0, 4) comments = comments[valid_labels].copy() comments['LABEL'] = comments['LABEL'].astype(np.int32) comments['WEIGHT'] = np.log1p(comments['VOTES']) + 1 return comments[['CLEAN_CONTENT', 'LABEL', 'WEIGHT']] # 自定义数据集(增加数据增强) class SentimentDataset(Dataset): def __init__(self, sequences, labels, weights, augment=False): self.sequences = torch.LongTensor(sequences) self.labels = torch.LongTensor(labels) self.weights = torch.FloatTensor(weights) self.augment = augment # 新增数据增强标志 def __len__(self): return len(self.labels) def __getitem__(self, idx): seq = self.sequences[idx] # 简单数据增强:随机mask if self.augment and np.random.rand() < 0.3: mask = np.random.rand(len(seq)) < 0.1 seq[mask] = 0 # 用0(padding索引)随机mask部分词语 return seq, self.labels[idx], self.weights[idx] # 改进的LSTM模型(添加注意力机制) class BiLSTMSentiment(nn.Module): def __init__(self, vocab_size, embedding_dim=128, hidden_dim=128, num_layers=2): super().__init__() self.embedding = nn.Embedding(vocab_size, embedding_dim) self.lstm = nn.LSTM(embedding_dim, hidden_dim, num_layers=num_layers, bidirectional=True, batch_first=True, dropout=0.3 if num_layers>1 else 0) self.attention = nn.Linear(hidden_dim*2, 1) # 新增注意力层 self.dropout = nn.Dropout(0.3) self.fc = nn.Sequential( nn.Linear(hidden_dim*2, 64), nn.ReLU(), nn.Dropout(0.3), nn.Linear(64, 5) ) def forward(self, x): x = self.embedding(x) # (batch, seq_len, embedding_dim) lstm_out, (hn, cn) = self.lstm(x) # lstm_out: (batch, seq_len, hidden_dim*2) # 注意力机制 attn_weights = torch.softmax(self.attention(lstm_out).squeeze(-1), dim=1) # (batch, seq_len) context = torch.bmm(attn_weights.unsqueeze(1), lstm_out).squeeze(1) # (batch, hidden_dim*2) return self.fc(self.dropout(context)) # 改进的训练函数(添加学习率调度和梯度裁剪) def train_model(model, train_loader, val_loader, optimizer, epochs=10): history = { 'train_loss': [], 'train_acc': [], 'val_loss': [], 'val_acc': [], 'val_f1': [], 'val_precision': [], 'val_recall': [] # 新增指标 } criterion = nn.CrossEntropyLoss(reduction='none') scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau( optimizer, mode='max', factor=0.5, patience=2, verbose=True) # 新增学习率调度 best_acc = 0.0 early_stopping_counter = 0 patience = 3 for epoch in range(epochs): model.train() epoch_loss = 0.0 epoch_correct = 0 total_samples = 0 progress_bar = tqdm(train_loader, desc=f'Epoch {epoch+1}/{epochs}', leave=False) for seq, labels, weights in progress_bar: seq, labels, weights = seq.to(device), labels.to(device), weights.to(device) optimizer.zero_grad() outputs = model(seq) loss = (criterion(outputs, labels) * weights).mean() loss.backward() # 梯度裁剪 torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0) optimizer.step() preds = outputs.argmax(dim=1) correct = (preds == labels).sum().item() epoch_correct += correct epoch_loss += loss.item() * seq.size(0) total_samples += seq.size(0) progress_bar.set_postfix({ 'loss': loss.item(), 'acc': f"{correct/seq.size(0):.2f}" }) # 验证阶段 val_acc, val_loss, val_f1, val_precision, val_recall = evaluate(model, val_loader) train_loss = epoch_loss / total_samples train_acc = epoch_correct / total_samples # 记录历史数据 history['train_loss'].append(train_loss) history['train_acc'].append(train_acc) history['val_loss'].append(val_loss) history['val_acc'].append(val_acc) history['val_f1'].append(val_f1) history['val_precision'].append(val_precision) history['val_recall'].append(val_recall) # 学习率调度 scheduler.step(val_acc) # 打印训练结果 print(f"\nEpoch {epoch+1} Summary:") print(f"Train Loss: {train_loss:.4f} | Train Acc: {train_acc:.4f}") print(f"Val Loss: {val_loss:.4f} | Val Acc: {val_acc:.4f}") print(f"Val F1: {val_f1:.4f} | Precision: {val_precision:.4f} | Recall: {val_recall:.4f}") # 早停机制 if val_acc > best_acc: best_acc = val_acc early_stopping_counter = 0 torch.save(model.state_dict(), 'best_model.pth') print(f"🚀 New best model saved with accuracy: {best_acc:.4f}") else: early_stopping_counter += 1 print(f"⏳ Early stopping counter: {early_stopping_counter}/{patience}") if early_stopping_counter >= patience: print(f"⛔ Early stopping triggered after {epoch+1} epochs!") break # 增强的可视化 plt.figure(figsize=(18, 6)) metrics = ['loss', 'acc', 'f1'] for i, metric in enumerate(metrics, 1): plt.subplot(1, 3, i) plt.plot(history[f'train_{metric}'], label='Train') plt.plot(history[f'val_{metric}'], label='Validation') plt.title(f'{metric.upper()} Curve') plt.xlabel('Epoch') plt.ylabel(metric.upper()) plt.legend() plt.tight_layout() plt.savefig('training_curves.png') plt.close() return model, history # 改进的评估函数(添加多指标) def evaluate(model, loader): model.eval() total_loss, total_correct = 0, 0 all_preds = [] all_labels = [] criterion = nn.CrossEntropyLoss() with torch.no_grad(): for seq, labels, _ in loader: seq, labels = seq.to(device), labels.to(device) outputs = model(seq) loss = criterion(outputs, labels) total_loss += loss.item() * seq.size(0) preds = outputs.argmax(dim=1) total_correct += (preds == labels).sum().item() all_preds.extend(preds.cpu().numpy()) all_labels.extend(labels.cpu().numpy()) avg_loss = total_loss / len(loader.dataset) accuracy = total_correct / len(loader.dataset) f1 = f1_score(all_labels, all_preds, average='macro') precision = precision_score(all_labels, all_preds, average='macro') recall = recall_score(all_labels, all_preds, average='macro') return accuracy, avg_loss, f1, precision, recall if __name__ == "__main__": # 数据准备流程(增加数据增强) data = load_and_preprocess_data() # 文本向量化 tokenizer = Tokenizer(num_words=50000, oov_token="<OOV>") tokenizer.fit_on_texts(data['CLEAN_CONTENT']) sequences = pad_sequences( tokenizer.texts_to_sequences(data['CLEAN_CONTENT']), maxlen=200, padding='post', truncating='post' ) # 数据集划分 X_train, X_temp, y_train, y_temp, w_train, w_temp = train_test_split( sequences, data['LABEL'].values, data['WEIGHT'].values, test_size=0.3, random_state=42 ) X_val, X_test, y_val, y_test, w_val, w_test = train_test_split( X_temp, y_temp, w_temp, test_size=0.5, random_state=42 ) # 数据加载器(启用训练集数据增强) train_loader = DataLoader( SentimentDataset(X_train, y_train, w_train, augment=True), # 启用数据增强 batch_size=512, shuffle=True ) val_loader = DataLoader( SentimentDataset(X_val, y_val, w_val), batch_size=512 ) test_loader = DataLoader( SentimentDataset(X_test, y_test, w_test), batch_size=512 ) # 模型初始化(使用改进后的模型) model = BiLSTMSentiment( vocab_size=50000, embedding_dim=256, hidden_dim=256, num_layers=2 ).to(device) optimizer = torch.optim.Adam(model.parameters(), lr=0.001, weight_decay=1e-4) # 添加权重衰减 # 训练流程 trained_model, history = train_model( model, train_loader, val_loader, optimizer, epochs=15 ) # 最终测试 test_acc, test_loss, test_f1, test_precision, test_recall = evaluate(trained_model, test_loader) print(f"\n🎯 Final Test Results:") print(f"Test Accuracy: {test_acc:.4f}") print(f"Test Loss: {test_loss:.4f}") print(f"Test F1: {test_f1:.4f} | Precision: {test_precision:.4f} | Recall: {test_recall:.4f}") # 模型保存 torch.save({ 'model_state': trained_model.state_dict(), 'tokenizer_config': tokenizer.to_json(), 'history': history }, 'sentiment_model.pth') print("Model saved with training history") 优先实施项​:残差连接 + 回译增强 + Focal Loss,Transformer-BiLSTM混合架构 + 自蒸馏 使用以上方法对此代码进行优化,不要对源代码进行删减修改,只是增加新的优化代码,给出完整代码

最新推荐

recommend-type

前端开发基于jQuery的选择器与DOM操作技术:网页元素精准定位及动态交互功能实现

内容概要:本文系统介绍了jQuery的基础知识,涵盖其概念、优势、开发环境搭建、核心语法与选择器、DOM遍历与操作方法,以及事件处理机制。文章强调jQuery作为轻量级JavaScript库在简化DOM操作、跨浏览器兼容性及提升开发效率方面的突出作用,并通过大量代码示例详细讲解了选择器(如标签、类、ID、属性、自定义及表单选择器)、DOM遍历方法(如filter、next、siblings等)、元素访问方式(.get()和索引访问)以及事件绑定与委托(如on、off、hover、ready等),帮助读者掌握jQuery的核心使用技巧。; 适合人群:具备HTML、CSS和JavaScript基础,初入前端领域的开发者或希望巩固jQuery基础的1-3年经验研发人员。; 使用场景及目标:①快速实现DOM元素选取与操作,提升页面交互开发效率;②理解jQuery事件机制与DOM遍历逻辑,用于传统项目维护或兼容性开发;③为学习现代前端框架前打下扎实的JavaScript操作基础。; 阅读建议:建议结合文中示例动手实践,重点理解选择器的使用场景与事件委托机制,注意区分jQuery对象与原生DOM对象的操作差异,并在实际项目中逐步应用所学内容以加深理解。
recommend-type

DeepSeek类AI在图书馆智慧服务中的应用与技术考量.docx

DeepSeek类AI在图书馆智慧服务中的应用与技术考量.docx
recommend-type

jsfuck自动解码工具.zip

jsfuck自动解码工具
recommend-type

COMSOL仿真模型展现手性结构

利用COMSOL软件进行手性结构电磁仿真的具体步骤和技术细节。主要内容涵盖如何构建手性结构的通用模型,以及如何精确计算不同极化状态(如左旋圆极化、右旋圆极化)下的透射率(Tlr, Trl, Trr, Tll)和反射率(Rlr, Rrl, Rrr, Rll)分量。文中特别强调了极化分量计算的关键点,包括正确应用MATLAB脚本进行电场分量的矢量合成与投影运算,确保极化方向的一致性和准确性。此外,还提供了能量守恒检查的方法,用于验证模型的可靠性。 适合人群:从事电磁仿真、光学器件设计的研究人员和工程师,尤其是那些需要深入了解手性材料电磁特性的专业人士。 使用场景及目标:适用于希望掌握COMSOL多物理场仿真工具在手性结构电磁特性研究中的应用,旨在提高仿真精度和效率,避免常见错误,确保仿真结果的可靠性和一致性。 其他说明:文中提供的MATLAB代码片段有助于读者更好地理解和实施具体的计算过程,同时提醒了一些常见的陷阱和注意事项,帮助用户规避潜在的问题。
recommend-type

VMD-MFRFNN_VMD-MFRFNN.zip

VMD-MFRFNN_VMD-MFRFNN.zip
recommend-type

Info2007v1.0更新至v2.0:优化管理与前台功能

根据提供的文件信息,可以挖掘出以下知识点: ### 标题知识点: 1. **免费时代WEB程序INFO2007 V1.0:** - 该标题表明存在一个名为INFO2007的WEB程序版本1.0,该版本是在免费时代推出的,可能意味着该程序是开源的或者提供免费下载。 ### 描述知识点: 1. **软件缺陷说明:** - 开发者提到程序存在BUG(程序缺陷),并提供了一个更新和反馈的渠道,说明软件仍在开发中,且有后续版本计划。 2. **联系方式:** - 开发者提供了QQ和邮箱作为联系方式,用于反馈问题或询问更新情况。 3. **Info2007v2.0更新内容:** - 提及了升级后的版本INFO2007v2.0新增功能,包括数据库结构变化(添加会员和公告表)、后台管理功能的增加与优化、前台功能的增加与优化等。 4. **安装要求:** - 软件需要特定的服务器环境支持,比如FSO(文件系统对象)、数据采集功能和JMAIL(邮件发送组件)。 5. **配置与安装细节:** - 对config.asp下的目录配置和pageurlsa变量做了说明,这些通常涉及程序的运行环境和安全设置。 6. **默认登录信息:** - 提供了默认的管理员用户名和密码,以及后台管理的默认目录,这对于安装和测试程序很重要。 7. **使用前的必要步骤:** - 强调了解压后生成静态页面的重要性,这可能是确保网站内容可被正确浏览的前置操作。 ### 标签知识点: 1. **ASP源码其他类别:** - 这表明该程序使用ASP(Active Server Pages)作为后端编程语言,并且归类于其他类别,可能意味着它不局限于某一特定功能或领域。 ### 压缩包文件名称列表知识点: 1. **www.codejia.com:** - 这个文件名可能指示了程序被托管或下载的来源网站,也暗示了可能含有与网站域名相关的程序文件。 ### 综合知识点: 1. **软件开发与维护:** - 从描述中可以看出开发者在推动软件的持续改进,并鼓励用户参与软件的测试和反馈过程。 2. **软件环境配置:** - 软件对运行环境有所要求,特别是服务器端的支持,需要了解FSO、数据采集、JMAIL等组件的使用和配置。 3. **后台管理系统:** - 更新内容中提及的后台管理功能,如会员管理、公告管理、文章管理等,显示了该程序提供了一套用于网站内容和用户管理的后台解决方案。 4. **前台展示优化:** - 对前台页面的优化和增加功能,如会员注册、文章页、下载页和分类栏目的改进,说明了对用户体验的重视。 5. **安全与权限控制:** - 默认用户名和密码的提供,以及后台目录的默认设置,强调了安装过程中应立即更改编译以提高安全性。 6. **静态页面生成:** - 生成静态页面作为必要步骤可能涉及到网站的性能优化和安全措施。 7. **开源与社区支持:** - 由于提及了更新的可能和用户反馈渠道,这表明软件具有一定的开源特性或至少鼓励社区参与。 综上所述,这些知识点涵盖了软件开发的常见方面,包括软件生命周期的维护、功能更新、环境配置、安全实践以及优化用户体验。了解和掌握这些知识点可以帮助开发者和用户更好地利用和改进免费时代WEB程序INFO2007 V1.0。
recommend-type

Rust测试实战:错误处理、环境变量与模拟服务器

### Rust 测试实战:错误处理、环境变量与模拟服务器 在 Rust 开发中,测试是确保代码质量和稳定性的重要环节。本文将深入探讨 Rust 中的测试技巧,包括错误处理、使用环境变量测试 Config 模块以及使用模拟服务器测试 profanity 模块。 #### 1. 错误处理与比较 在 Rust 中,我们可以为自定义错误类型实现 `std::fmt::Display` 特征,以便将错误转换为字符串。以下是一个示例: ```rust impl std::fmt::Display for Error { fn fmt(&self, f: &mut std::fmt::For
recommend-type

请分析下面代码:<tbody> <#if (paginationSupport.items)?has_content> <#list paginationSupport.items?sort_by('caseNo') as s> <tr class="b"> <td><a href="../user/viewRequestForm.action?requestFormId=${s.id}">${s.caseNo?default("Not Assigned")?if_exists}</a></td> <td>${s.lotId?if_exists}</td> <td><@m.directoryLink s.applicant?if_exists /></td> <td>${s.prodId?if_exists}</td> <td>${s.lotStageId?if_exists}</td> <td>${s.status?if_exists}</td> <td> <#if s.status!="Waiting For STR"> <#if s.workflowId?exists> <#list s.queuedOwners as owner> <#if owner?matches("[MmAaEeTt]\\d{3}.*")> <@m.directoryLink owner/> <#elseif owner?matches("[K-Z][A-Z][A-Z].*[_].*")> <#list adminService.loadGroupMembersByName(owner) as member> <@m.directoryLink member/><!--by Group--> </#list> <#else> <@m.orgManager owner?if_exists/><!--by Org--> </#if> </#list> </#if> <#else> <#if s.ercCategory=="Input the STR Number"||s.ercCategory=="STR"||s.ercCategory=="MSTR"> STR NO:<a href="${ercService.strLink?if_exists}${s.strEnNo?if_exists}" target="_blank">${s.strEnNo?if_exists}</a> is not effective <#else> EN NO:<a href="${ercService.enLink?if_exists}${s.strEnNo?if_exists}" target="_blank">${s.strEnNo?if_exists}</a> is not effective </#if> </#if> </td> <td><#if s.submitDate?exists>${s.submitDate?datetime?if_exists}</#if></td> <td>${s.purpose?default('')}</td> </tr> </#list> </#if> </tbody>

这段代码是使用 FreeMarker 模板引擎生成 HTML 表格内容的一部分,主要功能是根据分页数据展示请求表单的信息。以下是代码的详细分析: --- ### 1. **条件判断与循环** ```html <#if (paginationSupport.items)?has_content> ``` 判断 `paginationSupport.items` 是否存在且非空,只有存在数据时才执行表格渲染。 ```html <#list paginationSupport.items?sort_by('caseNo') as s> ``` 对 `paginationSupport.item
recommend-type

op-drone:监控未平仓头寸的市场利器

标题中提到的“op-drone”很可能是指一个特定的监控系统或服务的名称,它被用来“持续监控市场中的未平仓头寸”。未平仓头寸是指在金融市场上尚未完成交易结算的买卖双方的持仓数量。监控未平仓头寸对于市场参与者来说至关重要,因为它可以提供有关市场流动性和投资者情绪的重要信息。 在深入探讨之前,需要了解几个基础概念: 1. 未平仓头寸:指交易者在期货、期权、外汇或其他衍生品市场上建立的但尚未平仓的头寸。这些头寸在到期前仍然具有价值,而且市场上的价格变动会对它们的总体价值产生影响。 2. 持续监控:这通常是指使用软件工具或服务不断跟踪和分析市场数据的过程。持续监控可帮助交易者或市场分析师及时捕捉市场的动态变化,并根据最新情况做出交易决策。 3. 市场监控系统:这类系统通常具备收集实时数据、分析市场趋势、识别异常交易行为等多种功能。它们对于投资者了解市场状况、进行风险管理以及制定交易策略至关重要。 从描述中可以推断出,op-drone是一个专门用于持续监控未平仓头寸的系统或服务。这种系统需要具备以下功能: 1. 数据收集:系统需要有能力实时收集金融市场中的数据,包括但不限于期货、期权、股票、债券等金融产品的交易信息。 2. 数据分析:通过算法或机器学习技术分析收集到的数据,识别市场趋势、投资者行为模式以及潜在风险。 3. 异常检测:能够识别出市场中的异常交易活动,比如未平仓头寸的急剧变化,这可能是市场重大变动的前兆。 4. 风险预警:系统应能向用户发出风险预警,告知用户潜在的市场风险,帮助他们进行风险管理。 5. 报告与可视化:提供详细的数据报告和可视化图表,帮助用户更直观地理解市场状况和未平仓头寸变化。 此外,虽然文件中未提供标签和具体的文件名称列表,但可以推测“op-drone-main”可能是系统中的一个核心组件或主程序的名称。这个组件可能是整个op-drone系统运行的基础,负责处理大部分的监控和分析功能。 综合以上信息,我们可以得出一个结论:op-drone是一个专门设计用于监控金融市场上未平仓头寸的系统或服务。它可能具备实时数据收集和分析、异常行为识别、风险预警以及报告和可视化展示等多种功能,从而帮助用户在复杂的市场环境中做出更加明智的决策。对于需要精确把握市场动态、进行风险管理的金融分析师、交易员、投资组合经理等专业人士来说,这样的系统是不可或缺的工具。
recommend-type

RustWeb服务部署与环境变量配置全解析

### Rust Web 服务部署与环境变量配置全解析 在 Rust 开发中,完成业务逻辑编写后,将应用部署到生产环境是关键的下一步。本文将深入探讨 Rust 应用部署过程中的重要环节,包括环境变量的配置、二进制文件的优化以及跨平台编译等内容。 #### 1. 认证与授权相关未覆盖内容 在认证和授权方面,有一些内容未详细涉及。例如,设置会话数据库来存储令牌,以便在数据泄露或用户有其他需求时使令牌失效。可以通过设置 Redis 实例,在生成每个令牌时,不仅将结果返回给客户端,还将其存储在 Redis 键值存储中。 另外,重置用户密码和创建刷新令牌也是重要的话题。重置用户密码可以通过创建一个