from transformers import AutoModelForTokenClassification, AutoTokenizer, DataCollatorForTokenClassification from transformers import TrainingArguments, Trainer import evaluate from datasets import load_dataset import numpy as np from seqeval.metrics import classification_report, f1_score, precision_score, recall_score import torch import os import json import logging from collections import Counter from sklearn.utils.class_weight import compute_class_weight import matplotlib.pyplot as plt # 设置详细日志 logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') logger = logging.getLogger(__name__) # 设置设备 device = torch.device("cuda" if torch.cuda.is_available() else "cpu") logger.info(f"Using device: {device}") # 模型路径 model_dir = 'C:/Users/admin/.cache/modelscope/hub/tiansz/bert-base-chinese' # 加载tokenizer tokenizer = AutoTokenizer.from_pretrained(model_dir) logger.info("Tokenizer加载完成") # 加载数据集 path = r'C:\Users\admin\八斗学院学习笔记\练习\week11_codes\data' dataset = load_dataset(path) logger.info(f"数据集加载完成: {dataset}") # 提取所有实体标签 labels = set() for row in dataset['train']: for ent in row['entities']: labels.add(ent['label']) # 创建完整的标签列表(包含B-和I-前缀) entities = sorted(list(labels)) # 排序确保一致性 tags = ['O'] for ent in entities: tags.append('B-' + ent.upper()) tags.append('I-' + ent.upper()) # 创建标签到ID的映射 tag2id = {tag: i for i, tag in enumerate(tags)} id2tag = {i: tag for i, tag in enumerate(tags)} logger.info(f"标签数量: {len(tags)}") logger.info(f"标签列表: {tags}") # 保存标签映射 os.makedirs('label_mappings', exist_ok=True) with open('label_mappings/tag2id.json', 'w', encoding='utf-8') as f: json.dump(tag2id, f, ensure_ascii=False, indent=2) with open('label_mappings/id2tag.json', 'w', encoding='utf-8') as f: json.dump(id2tag, f, ensure_ascii=False, indent=2) logger.info("标签映射已保存到 label_mappings 目录") # 实体处理函数(修复版)- 确保标签正确性 def entities_proc(items): text = items['text'] items_len = len(text) ent_tags = ['O'] * items_len # 初始化为'O' entities = items['entities'] for ent in entities: start = ent['start_offset'] end = ent['end_offset'] label = ent['label'].upper() # 统一转换为大写 # 确保位置在范围内 if start < items_len: # 设置B-标签 ent_tags[start] = f'B-{label}' # 设置I-标签(如果有多个字符) for pos in range(start + 1, min(end, items_len)): ent_tags[pos] = f'I-{label}' # 转换为ID(遇到未知标签默认为'O') tag_ids = [tag2id.get(tag, tag2id['O']) for tag in ent_tags] return {'ent_tags': tag_ids} # 应用实体处理 logger.info("处理实体标签...") ds = dataset.map(entities_proc) # 分析标签分布 def analyze_label_distribution(dataset, split): logger.info(f"\n分析 {split} 标签分布...") all_labels = [] for example in dataset[split]: all_labels.extend([id2tag.get(tag_id, 'O') for tag_id in example['ent_tags']]) label_counts = Counter(all_labels) total_labels = len(all_labels) logger.info(f"总标签数: {total_labels}") for label, count in label_counts.items(): percentage = (count / total_labels) * 100 logger.info(f"{label}: {count} ({percentage:.2f}%)") # 计算实体标签的比例 entity_labels = [label for label in label_counts.keys() if label != 'O'] entity_count = sum([label_counts[label] for label in entity_labels]) entity_percentage = (entity_count / total_labels) * 100 logger.info(f"实体标签总数: {entity_count} ({entity_percentage:.2f}%)") logger.info(f"非实体标签(O): {label_counts['O']} ({100-entity_percentage:.2f}%)") # 可视化标签分布 plt.figure(figsize=(12, 6)) labels_list = list(label_counts.keys()) counts = [label_counts[l] for l in labels_list] plt.bar(labels_list, counts) plt.xticks(rotation=90) plt.title(f'{split} 标签分布') plt.ylabel('数量') plt.tight_layout() plt.savefig(f'label_distribution_{split}.png') logger.info(f"标签分布图已保存为 label_distribution_{split}.png") return label_counts # 分析标签分布 train_label_counts = analyze_label_distribution(ds, 'train') test_label_counts = analyze_label_distribution(ds, 'test') # 计算类别权重解决不平衡问题 logger.info("\n计算类别权重解决不平衡问题...") all_train_labels = [] for example in ds['train']: all_train_labels.extend(example['ent_tags']) # 计算类别权重 class_weights = compute_class_weight( class_weight='balanced', classes=np.unique(all_train_labels), y=all_train_labels ) class_weights = torch.tensor(class_weights, dtype=torch.float32).to(device) # 增强实体标签的权重 for i, tag in enumerate(tags): if tag != 'O': class_weights[i] *= 5.0 # 增加实体标签的权重 logger.info(f"类别权重: {class_weights}") # 数据输入处理函数(修复版 - 使用offset_mapping) def data_input_proc(items): # 获取文本列表 texts = items['text'] # 分词(启用offset_mapping) tokenized_inputs = tokenizer( texts, truncation=True, padding=True, max_length=128, return_offsets_mapping=True, is_split_into_words=False, return_tensors="pt", ) # 获取偏移量映射 offset_mappings = tokenized_inputs.pop('offset_mapping') all_labels = [] for i, text in enumerate(texts): # 获取字符级标签ID列表 char_tags = items['ent_tags'][i] label_ids = [] offsets = offset_mappings[i] for j, offset in enumerate(offsets): start, end = offset # 特殊token ([CLS], [SEP], [PAD]) 设置为-100 if start == 0 and end == 0: label_ids.append(-100) else: # 取起始位置对应的字符标签 if start < len(char_tags): label_ids.append(char_tags[start]) else: # 处理截断情况 label_ids.append(-100) all_labels.append(label_ids) tokenized_inputs["labels"] = all_labels return tokenized_inputs # 应用输入处理 logger.info("处理输入数据...") ds1 = ds.map( data_input_proc, batched=True, batch_size=8, remove_columns=ds["train"].column_names ) # 训练参数 args = TrainingArguments( output_dir='ner_train', num_train_epochs=3, per_device_train_batch_size=4, per_device_eval_batch_size=8, evaluation_strategy="epoch", save_strategy="epoch", logging_strategy="epoch", load_best_model_at_end=False, # 暂时禁用,直到我们解决指标问题 report_to='tensorboard', logging_dir='ner_train/logs', save_total_limit=2, learning_rate=5e-5, weight_decay=0.01, overwrite_output_dir=True, warmup_ratio=0.1, gradient_accumulation_steps=4, fp16=torch.cuda.is_available(), logging_steps=20, remove_unused_columns=False, # 暂时不使用 metric_for_best_model ) # 创建带类别权重的自定义损失函数 class WeightedLossModel(torch.nn.Module): def __init__(self, model, class_weights): super().__init__() self.model = model self.class_weights = class_weights self.loss_fct = torch.nn.CrossEntropyLoss(weight=class_weights, ignore_index=-100) def forward(self, **inputs): outputs = self.model(**inputs) logits = outputs.logits if "labels" in inputs: loss = self.loss_fct(logits.view(-1, self.model.config.num_labels), inputs["labels"].view(-1)) outputs.loss = loss return outputs # 加载基础模型 logger.info("加载基础模型...") base_model = AutoModelForTokenClassification.from_pretrained( model_dir, num_labels=len(tags), id2label=id2tag, label2id=tag2id, ignore_mismatched_sizes=True ).to(device) # 创建带权重的模型 model = WeightedLossModel(base_model, class_weights).to(device) logger.info(f"模型结构: 带类别权重的{base_model.__class__.__name__}") logger.info(f"模型参数量: {sum(p.numel() for p in model.parameters()):,}") # 数据收集器 data_collator = DataCollatorForTokenClassification(tokenizer=tokenizer) # 评估指标计算函数(增强调试版) def compute_metrics(p): try: # 确保输入格式正确 if hasattr(p, "predictions") and hasattr(p, "label_ids"): predictions = p.predictions labels = p.label_ids elif isinstance(p, tuple) and len(p) == 2: predictions, labels = p else: logger.error(f"无法识别的输入格式: {type(p)}") return {"f1": 0.0, "precision": 0.0, "recall": 0.0, "accuracy": 0.0} # 确保预测结果有正确的形状 if predictions.ndim == 3: predictions = np.argmax(predictions, axis=2) # 移除忽略的索引(-100) true_predictions = [] true_labels = [] for i in range(len(predictions)): preds = [] lbls = [] for j in range(len(predictions[i])): if labels[i][j] != -100: preds.append(id2tag[predictions[i][j]]) lbls.append(id2tag[labels[i][j]]) true_predictions.append(preds) true_labels.append(lbls) # 计算指标 precision = 0.0 recall = 0.0 f1 = 0.0 accuracy = 0.0 if true_labels and any(true_labels): # 确保标签列表非空 try: precision = precision_score(true_labels, true_predictions, zero_division=0) recall = recall_score(true_labels, true_predictions, zero_division=0) f1 = f1_score(true_labels, true_predictions, zero_division=0) # 计算准确率(token级别) total = 0 correct = 0 for preds, lbls in zip(true_predictions, true_labels): for p, l in zip(preds, lbls): total += 1 if p == l: correct += 1 accuracy = correct / total if total > 0 else 0.0 # 生成分类报告 report = classification_report(true_labels, true_predictions, output_dict=True, zero_division=0) # 记录实体标签的性能 logger.info("\n===== 详细分类报告 =====") for tag in tags: if tag in report and tag != 'O': logger.info(f"{tag}: P={report[tag]['precision']:.4f}, R={report[tag]['recall']:.4f}, F1={report[tag]['f1-score']:.4f}") # 记录前10个预测和标签 logger.info("\n===== 样本预测检查 =====") for i in range(min(3, len(true_labels))): logger.info(f"样本 {i} 预测: {' '.join(true_predictions[i][:20])}") logger.info(f"样本 {i} 标签: {' '.join(true_labels[i][:20])}") logger.info("") # 计算并记录非O标签的准确率 non_o_correct = 0 non_o_total = 0 for preds, lbls in zip(true_predictions, true_labels): for p, l in zip(preds, lbls): if l != 'O': non_o_total += 1 if p == l: non_o_correct += 1 non_o_accuracy = non_o_correct / non_o_total if non_o_total > 0 else 0.0 logger.info(f"非O标签准确率: {non_o_accuracy:.4f} ({non_o_correct}/{non_o_total})") except Exception as inner_e: logger.error(f"计算评估指标时出错: {inner_e}") logger.error(f"预测形状: {predictions.shape}") logger.error(f"标签形状: {labels.shape}") logger.error(f"true_labels长度: {len(true_labels)}") logger.error(f"true_labels内容示例: {true_labels[:1] if true_labels else '空'}") else: logger.warning("评估时没有有效标签!") return { "f1": f1, "precision": precision, "recall": recall, "accuracy": accuracy } except Exception as outer_e: logger.error(f"compute_metrics函数发生严重错误: {outer_e}") return { "f1": 0.0, "precision": 0.0, "recall": 0.0, "accuracy": 0.0 } # 创建Trainer trainer = Trainer( model=model, args=args, train_dataset=ds1['train'], eval_dataset=ds1['test'], data_collator=data_collator, compute_metrics=compute_metrics, tokenizer=tokenizer ) # 预评估测试 logger.info("进行预评估测试...") try: eval_result = trainer.evaluate() logger.info(f"预评估结果: {eval_result}") # 检查是否返回了我们的指标 if "eval_f1" in eval_result: logger.info("评估指标计算成功!") # 重新启用最佳模型保存 args.load_best_model_at_end = True args.metric_for_best_model = "f1" # 使用基础名称,Trainer会自动添加"eval_"前缀 trainer = Trainer( model=model, args=args, train_dataset=ds1['train'], eval_dataset=ds1['test'], data_collator=data_collator, compute_metrics=compute_metrics, tokenizer=tokenizer ) else: logger.warning("评估指标未返回,将禁用最佳模型保存功能") except Exception as e: logger.error(f"预评估失败: {e}") # 训练模型 logger.info("开始训练...") try: train_result = trainer.train() logger.info("训练完成!") except Exception as e: logger.error(f"训练失败: {e}") # 尝试更小的学习率 logger.info("尝试更小的学习率...") args.learning_rate = 1e-5 trainer = Trainer( model=model, args=args, train_dataset=ds1['train'], eval_dataset=ds1['test'], data_collator=data_collator, compute_metrics=compute_metrics, tokenizer=tokenizer ) train_result = trainer.train() logger.info("训练完成!") # 保存最终模型 trainer.save_model("final_ner_model") logger.info("最终模型已保存到 final_ner_model 目录") # 训练完成后评估模型 logger.info("\n===== 最终评估结果 =====") try: eval_results = trainer.evaluate(ds1['test']) logger.info(f"验证集准确率: {eval_results.get('eval_accuracy', 0.0):.4f}") logger.info(f"验证集精确率: {eval_results.get('eval_precision', 0.0):.4f}") logger.info(f"验证集召回率: {eval_results.get('eval_recall', 0.0):.4f}") logger.info(f"验证集F1值: {eval_results.get('eval_f1', 0.0):.4f}") # 打印所有可用的评估指标 logger.info("\n所有可用评估指标:") for key, value in eval_results.items(): logger.info(f"{key}: {value}") except Exception as e: logger.error(f"最终评估失败: {e}") # 绘制训练损失曲线 def plot_training_history(history): try: train_loss = [log['loss'] for log in history if 'loss' in log] eval_loss = [log['eval_loss'] for log in history if 'eval_loss' in log] if not train_loss: logger.warning("没有训练损失数据可绘制") return epochs = list(range(1, len(train_loss) + 1)) plt.figure(figsize=(12, 6)) plt.plot(epochs, train_loss, 'b-', label='训练损失') if eval_loss: plt.plot(epochs[:len(eval_loss)], eval_loss, 'r-', label='验证损失') plt.title('训练和验证损失') plt.xlabel('Epochs') plt.ylabel('损失') plt.legend() plt.grid(True) plt.savefig('training_loss.png') logger.info("训练损失图已保存为 training_loss.png") # 绘制F1分数(如果可用) eval_f1 = [log['eval_f1'] for log in history if 'eval_f1' in log] if eval_f1: plt.figure(figsize=(12, 6)) plt.plot(epochs[:len(eval_f1)], eval_f1, 'g-', label='验证F1') plt.title('验证F1分数') plt.xlabel('Epochs') plt.ylabel('F1分数') plt.legend() plt.grid(True) plt.savefig('eval_f1.png') logger.info("F1分数图已保存为 eval_f1.png") except Exception as e: logger.error(f"绘制训练历史时出错: {e}") # 绘制训练历史 plot_training_history(trainer.state.log_history) # 测试模型预测 logger.info("\n测试模型预测...") sample_texts = [ "我在北京大学学习人工智能", "马云是阿里巴巴集团的创始人", "上海市浦东新区张江高科技园区" ] # 自定义预测函数 def predict_entities(text, model, tokenizer, id2tag, device): try: # 分词 inputs = tokenizer( text, return_tensors="pt", return_offsets_mapping=True, truncation=True, max_length=128 ) # 移动到设备 input_ids = inputs['input_ids'].to(device) attention_mask = inputs['attention_mask'].to(device) token_type_ids = inputs.get('token_type_ids', None) if token_type_ids is not None: token_type_ids = token_type_ids.to(device) offset_mapping = inputs['offset_mapping'].cpu().numpy()[0] # 预测 model.eval() with torch.no_grad(): outputs = model( input_ids=input_ids, attention_mask=attention_mask, token_type_ids=token_type_ids ) # 获取预测结果 logits = outputs.logits predictions = torch.argmax(logits, dim=-1).cpu().numpy()[0] # 提取实体 entities = [] current_entity = None tokens = tokenizer.convert_ids_to_tokens(input_ids[0]) logger.info(f"\n预测文本: {text}") logger.info(f"Token列表: {tokens}") for i, (pred, offset) in enumerate(zip(predictions, offset_mapping)): # 跳过特殊token if offset[0] == 0 and offset[1] == 0: continue tag = id2tag[pred] logger.info(f"Token {i}: {tokens[i]} -> {tag} (位置: {offset[0]}-{offset[1]})") # 处理实体 if tag.startswith('B-'): # 结束前一个实体 if current_entity: entities.append(current_entity) # 开始新实体 entity_type = tag[2:] start = offset[0] end = offset[1] current_entity = { 'start': start, 'end': end, 'type': entity_type, 'text': text[start:end] } elif tag.startswith('I-'): # 继续当前实体 if current_entity and current_entity['type'] == tag[2:]: current_entity['end'] = offset[1] current_entity['text'] = text[current_entity['start']:offset[1]] else: # 如果I标签没有匹配的B标签,则创建新实体 entity_type = tag[2:] start = offset[0] end = offset[1] current_entity = { 'start': start, 'end': end, 'type': entity_type, 'text': text[start:end] } else: # O # 结束前一个实体 if current_entity: entities.append(current_entity) current_entity = None # 添加最后一个实体 if current_entity: entities.append(current_entity) return entities except Exception as e: logger.error(f"预测过程中出错: {e}") return [] # 进行预测 for text in sample_texts: logger.info(f"\n===== 预测文本: {text} =====") try: entities = predict_entities(text, model, tokenizer, id2tag, device) if not entities: logger.info(" 未识别到实体") else: for ent in entities: logger.info(f" 实体: {ent['text']}, 类型: {ent['type']}, 位置: {ent['start']}-{ent['end']}") except Exception as e: logger.error(f"预测文本 '{text}' 时出错: {e}") logger.info("\nNER任务完成!")这代码都实现了什么还有可以优化的嘛
时间: 2025-07-30 15:27:18 浏览: 15
<think>我们首先分析用户的问题:用户想了解代码实现的功能以及优化建议。提供的代码是Java文件复制程序,使用缓冲流提高效率。代码功能分析:这段Java代码实现的功能是将一个文件(1.txt)的内容复制到另一个文件(2.txt)。它使用了缓冲流(BufferedInputStream和BufferedOutputStream)来提升文件读写的效率。优化建议:1.使用try-with-resources:原始代码中使用了繁琐的finally块来关闭资源,且嵌套了多个try-catch。在Java7及以上,可以使用try-with-resources自动管理资源,避免资源泄漏,代码更简洁。2.合并流的关闭:由于缓冲流包裹了文件流,关闭外层缓冲流时会自动关闭内层文件流。因此,不需要分别关闭。在try-with-resources中,只需声明最外层的流即可。3.异常处理:可以统一处理异常,减少重复代码。4.路径硬编码:建议将文件路径作为参数,提高灵活性。优化后的代码示例:```javaimportjava.io.*;publicclassIoTest{publicstaticvoidmain(String[]args){StringsrcPath="/Users/dv_susan/Documents/workspace/jump/src/main/java/com/sue/jump/service/test1/1.txt";StringdestPath="/Users/dv_susan/Documents/workspace/jump/src/main/java/com/sue/jump/service/test1/2.txt";try(BufferedInputStreambis=newBufferedInputStream(newFileInputStream(srcPath));BufferedOutputStreambos=newBufferedOutputStream(newFileOutputStream(destPath))){byte[]buffer=newbyte[1024];intlen;while((len=bis.read(buffer))!=-1){bos.write(buffer,0,len);}}catch(IOExceptione){e.printStackTrace();}}}```优化点说明:-使用try-with-resources自动关闭资源,无需显式调用close()。-只关闭外层缓冲流,内层流会自动关闭。-代码结构更清晰,减少了重复的关闭资源和异常处理代码。注意:在文件路径的处理上,实际应用中可以考虑通过参数传递路径,而不是硬编码。相关问题:1.为什么使用缓冲流可以提高文件复制的效率?2.在文件复制过程中,如何选择缓冲区的大小?3.除了缓冲流,还有哪些方法可以提高Java文件IO的性能?4.try-with-resources语句是如何工作的?它适用于哪些资源?</think>### 代码功能分析
这段 Java 代码实现了文件复制功能,具体包括:
1. **源文件到目标文件的复制**:将 `/Users/.../1.txt` 复制到 `/Users/.../2.txt`
2. **缓冲流优化**:使用 `BufferedInputStream` 和 `BufferedOutputStream` 包装基础文件流,通过缓冲区减少磁盘 I/O 次数
3. **异常处理**:通过 `try-catch-finally` 捕获可能的 `IOException`
4. **资源释放**:在 `finally` 块中确保所有流被关闭
### 优化建议
#### 1. 使用 try-with-resources(Java 7+)
当前显式关闭资源的逻辑冗长且易错,可简化为:
```java
try (BufferedInputStream bis = new BufferedInputStream(new FileInputStream(srcFile));
BufferedOutputStream bos = new BufferedOutputStream(new FileOutputStream(destFile))) {
byte[] buffer = new byte[1024];
int len;
while ((len = bis.read(buffer)) != -1) {
bos.write(buffer, 0, len);
}
} catch (IOException e) {
e.printStackTrace();
}
```
**优势**:
- 自动关闭资源(符合 AutoCloseable 接口)
- 消除冗余的 `finally` 块[^4]
- 避免嵌套 `try-catch` 结构
#### 2. 优化缓冲区大小
当前固定使用 1024 字节缓冲区:
```java
byte[] buffer = new byte[1024]; // 可优化
```
**建议**:
- 根据文件大小动态调整(如大文件使用 8KB 或 16KB)
- 使用 `BufferedInputStream` 的默认缓冲区大小(通常 8KB):
```java
try (InputStream is = Files.newInputStream(srcFile.toPath());
OutputStream os = Files.newOutputStream(destFile.toPath())) {
is.transferTo(os); // Java 9+ 高效传输
}
```
#### 3. 路径处理改进
硬编码路径降低灵活性:
```java
File srcFile = new File("/Users/.../1.txt"); // 硬编码路径
```
**优化方案**:
- 改为参数化输入:
```java
public static void copyFile(String srcPath, String destPath) { ... }
```
- 使用 `Paths.get()` 替代 `new File()`(NIO.2 API)
#### 4. 异常处理增强
当前仅打印异常栈:
```java
e.printStackTrace(); // 需改进
```
**优化方向**:
- 添加自定义错误信息
- 向上层抛出异常或记录日志
- 处理特定异常类型(如 `FileNotFoundException`)
#### 5. 性能优化点
- **关闭 `flush()` 调用**:`bos.flush()` 在 `close()` 前冗余,缓冲流关闭时会自动刷新[^4]
- **NIO 文件操作**:使用 `Files.copy()` 可单行实现复制且更高效:
```java
Files.copy(srcFile.toPath(), destFile.toPath());
```
### 最终优化版本 (Java 9+)
```java
import java.nio.file.*;
public class FileCopyOptimized {
public static void main(String[] args) {
Path src = Paths.get("source.txt");
Path dest = Paths.get("target.txt");
try {
Files.copy(src, dest, StandardCopyOption.REPLACE_EXISTING);
} catch (IOException e) {
System.err.println("文件复制失败: " + e.getMessage());
}
}
}
```
### 优化效果对比
| 原方案问题 | 优化方案 | 收益 |
|---------------------|------------------------|--------------------------|
| 手动关闭资源 | try-with-resources | 避免资源泄漏[^2] |
| 固定缓冲区 | 动态/NIO API | 提升大文件复制速度 |
| 冗余 flush | 移除显式 flush | 减少不必要 I/O 操作 |
| 硬编码路径 | 参数化输入 | 提高代码复用性 |
阅读全文
相关推荐















