活动介绍
file-type

基于Transformers的文本分类技术详解

下载需积分: 39 | 30.01MB | 更新于2025-02-07 | 37 浏览量 | 34 下载量 举报 2 收藏
download 立即下载
标题“Transformers_for_Text_Classification:基于Transformers的文本分类”介绍了本文件探讨的核心议题,即使用Transformers架构进行自然语言处理(NLP)中的文本分类任务。文本分类是指将文本数据按照预定的类别进行分类的技术,广泛应用于垃圾邮件检测、情感分析、新闻分类等场景。 描述部分提供了文档的具体内容概览。首先,它提到了基于最新版本v2.2.2的代码重构。这说明文档内容可能包含了对现有代码库的更新,确保其未来能够兼容并易于复用。重构是软件开发中常见的一次性活动,目的是改善代码的内部结构,而不影响其外部行为。 文档强调了几个关键特性,如支持transformer模型后接不同的特征提取器,这表明该工作不仅关注于Transformer模型本身,还包括了如何将这些模型与不同的预处理或特征提取策略相结合。测试集预测代码的提及意味着提供了一种方式去评估模型在未知数据上的表现,这对于模型的实际应用至关重要。 代码的精简是文档提到的另一项改进。在机器学习和深度学习领域,代码的简洁性有助于提高可读性、可维护性及运行效率。优化日志记录终端输出的说明,则暗示了作者对代码调试和结果追踪的重视。良好的日志记录机制对于模型训练的监控和错误诊断有着重要作用。 在支持的模型型号部分,列举了多种预训练模型,如“伯特”、“伯特·cnn”、“bert_lstm”、“伯特·格鲁”、“互联网”、“xlnet_cnn”、“xlnet_lstm”、“xlnet_gru”、“阿尔伯特”。这些模型名称说明了文档涵盖了多种基于Transformer架构的预训练语言模型及其变体。其中“伯特”指的是BERT(Bidirectional Encoder Representations from Transformers),而“xlnet”则是指XLNet。这些模型在NLP领域是当前最先进的一类预训练语言模型,代表了目前文本分类技术的前沿水平。 标签“nlp text-classification transformers NaturallanguageprocessingPython”则更准确地描述了文档的内容范畴,强调了自然语言处理、文本分类、Transformers模型和Python编程语言。Python是目前NLP领域中最流行的编程语言之一,有着丰富的库和框架支持。 最后,压缩包子文件的文件名称列表中的“Transformers_for_Text_Classification-master”暗示了这是一个开源项目或代码库,并且有主分支(master)存在。这可能意味着该项目有版本控制,便于协作开发和版本跟踪,同时也表明了开发者社区对该项目的关注和参与。 综上所述,该文件涉及了在文本分类任务中应用Transformers模型的一系列技术知识,包括模型重构、特征提取、测试预测、代码优化、日志记录以及对多种预训练模型的支持。这些知识点对于希望在文本分类领域中应用深度学习技术的开发者来说具有很高的参考价值。

相关推荐

filetype
### 文本分类 #### 数据预处理 要求训练集和测试集分开存储,对于中文的数据必须先分词,对分词后的词用空格符分开,并且将标签连接到每条数据的尾部,标签和句子用分隔符\分开。具体的如下: * 今天 的 天气 真好\积极 #### 文件结构介绍 * config文件:配置各种模型的配置参数 * data:存放训练集和测试集 * ckpt_model:存放checkpoint模型文件 * data_helpers:提供数据处理的方法 * pb_model:存放pb模型文件 * outputs:存放vocab,word_to_index, label_to_index, 处理后的数据 * models:存放模型代码 * trainers:存放训练代码 * predictors:存放预测代码 #### 训练模型 * python train.py --config_path="config/textcnn_config.json" #### 预测模型 * 预测代码都在predictors/predict.py中,初始化Predictor对象,调用predict方法即可。 #### 模型的配置参数详述 ##### textcnn:基于textcnn的文本分类 * model_name:模型名称 * epochs:全样本迭代次数 * checkpoint_every:迭代多少步保存一次模型文件 * eval_every:迭代多少步验证一次模型 * learning_rate:学习速率 * optimization:优化算法 * embedding_size:embedding层大小 * num_filters:卷积核的数量 * filter_sizes:卷积核的尺寸 * batch_size:批样本大小 * sequence_length:序列长度 * vocab_size:词汇表大小 * num_classes:样本的类别数,二分类时置为1,多分类时置为实际类别数 * keep_prob:保留神经元的比例 * l2_reg_lambda:L2正则化的系数,主要对全连接层的参数正则化 * max_grad_norm:梯度阶段临界值 * train_data:训练数据的存储路径 * eval_data:验证数据的存储路径 * stop_word:停用词表的存储路径 * output_path:输出路径,用来存储vocab,处理后的训练数据,验证数据 * word_vectors_path:词向量的路径 * ckpt_model_path:checkpoint 模型的存储路径 * pb_model_path:pb 模型的存储路径 ##### bilstm:基于bilstm的文本分类 * model_name:模型名称 * epochs:全样本迭代次数 * checkpoint_every:迭代多少步保存一次模型文件 * eval_every:迭代多少步验证一次模型 * learning_rate:学习速率 * optimization:优化算法 * embedding_size:embedding层大小 * hidden_sizes:lstm的隐层大小,列表对象,支持多层lstm,只要在列表中添加相应的层对应的隐层大小 * batch_size:批样本大小 * sequence_length:序列长度 * vocab_size:词汇表大小 * num_classes:样本的类别数,二分类时置为1,多分类时置为实际类别数 * keep_prob:保留神经元的比例 * l2_reg_lambda:L2正则化的系数,主要对全连接层的参数正则化 * max_grad_norm:梯度阶段临界值 * train_data:训练数据的存储路径 * eval_data:验证数据的存储路径 * stop_word:停用词表的存储路径 * output_path:输出路径,用来存储vocab,处理后的训练数据,验证数据 * word_vectors_path:词向量的路径 * ckpt_model_path:checkpoint 模型的存储路径 * pb_model_path:pb 模型的存储路径 ##### bilstm atten:基于bilstm + attention 的文本分类 * model_name:模型名称 * epochs:全样本迭代次数 * checkpoint_every:迭代多少步保存一次模型文件 * eval_every:迭代多少步验证一次模型 * learning_rate:学习速率 * optimization:优化算法 * embedding_size:embedding层大小 * hidd
filetype

基于Transformer搜狐新闻文本分类 本实验主要基于Transformer来实现对搜狐新闻文本分类,大致步骤如下。 具体实验步骤: 步骤1.数据准备 训练集共有24000条样本,12个分类,每个分类2000条样本。 测试集共有12000条样本,12个分类,每个分类1000条样本。 载入分词文件的代码如下: with open('cutWords_list.txt') as file: cutWords_list = [k.split() for k in file.readlines()] 对数据集进行文本预处理,并划分出训练集、验证集和测试集; 步骤2:词向量模型训练 使用步骤1中处理后的全部新闻训练word2vec词向量模型,保存训练好的word2vec词向量模型。 步骤3:基于transformer序列编码构建层级神经网络 基于transformer序列编码构建层级神经网络,用于提取文本特征; 步骤4:文本特征提取 使用步骤2的词向量模型将步骤1中预处理后的新闻转化为向量表示,将向量输入步骤3构建的层级神经网络,输出的向量为新闻的文本特征; 步骤5.模型训练、模型评估 模型训练、模型评估的样例结果 步骤6.模型测试 需要打印出在在新样本上的测试结果(注意不能用下面用例测试),如: 注意:参考代码为https://github.com/percent4/pytorch_transformer_chinese_text_classification https://github.com/649453932/Chinese-Text-Classification-Pytorch 或找其他类似的代码进行实现。

filetype

import torch import json import os import argparse import numpy as np import re from torch.utils.data import Dataset, DataLoader from tqdm import tqdm from PIL import Image from peft import LoraConfig, get_peft_model from transformers import ( AutoModelForCausalLM, AutoProcessor, TrainingArguments, BitsAndBytesConfig, GenerationConfig, AutoTokenizer, AutoImageProcessor, get_cosine_schedule_with_warmup ) import torch.optim as optim from sklearn.metrics import f1_score, accuracy_score, precision_score, recall_score, classification_report import warnings warnings.filterwarnings("ignore", message="Could not find a config file") # 标签映射定义 TASK1_LABEL_MAP = {"无害": 0, "有害": 1} TASK2_LABEL_MAP = { 0: "无害", 1: "针对性有害", 2: "一般性有害", 3: "性暗示", 4: "沮丧文化" } class HarmfulMemeDataset(Dataset): def __init__(self, annotation_path, tokenizer, image_processor, task_name, image_root=None): print(f"Loading annotations from: {annotation_path}") self.image_root = image_root self.tokenizer = tokenizer self.image_processor = image_processor self.processor = tokenizer # 关键修复:添加processor属性引用 with open(annotation_path, 'r', encoding='utf-8') as f: self.data = json.load(f) self.task_name = task_name self.label_token_ids = self._get_label_tokens() def _get_label_tokens(self): """获取标签对应的token ID""" if self.task_name == "task_1": labels = ["无害", "有害"] else: labels = [str(i) for i in range(5)] # 0-4 token_ids = {} tokenizer = self.processor.tokenizer for label in labels: tokens = tokenizer.encode(label) token_ids[label] = tokens # 保存完整的token序列 if len(tokens) > 1: print(f"警告: 标签 '{label}' 被分词为多个token: {tokens}") return token_ids def __len__(self): return len(self.data) def __getitem__(self, idx): sample = self.data[idx] if self.image_root: image_path = os.path.join(self.image_root, sample["path"]) else: image_path = sample["path"] if not os.path.exists(image_path): raise FileNotFoundError(f"图片不存在:{image_path}") image = Image.open(image_path).convert("RGB") if self.task_name == "task_1": user_content = f"任务:有害模因检测(是否有害)。\n输入文本:'{sample['text']}'\n请判断此图文组合是否有害。" raw_label = sample['label']# 原始标签是字符串("无害"或"有害") # 根据TASK1_LABEL_MAP进行映射 if isinstance(raw_label, int): # 如果是整数,转换为对应的字符串标签 label_map = {v: k for k, v in TASK1_LABEL_MAP.items()} # 反转映射 label = label_map.get(raw_label, "无害") # 默认值为"无害" else: # 如果已经是字符串,直接使用 label = raw_label label_token = self.label_token_ids[label] assistant_content = f"结论:{label}。\n理由:{sample['explanation']}" else: user_content = f"任务:有害模因类型分类。\n输入文本:'{sample['text']}'\n请判断此图文组合的有害类型(0-4)。" raw_label = str(sample['type'])# 将整数标签转换为字符串 label = str(raw_label) label_token = self.label_token_ids[label] assistant_content = f"结论:{label}。\n理由:{sample['explanation']}" messages = [ {"role": "user", "content": [{"type": "image"}, {"type": "text", "text": user_content}]}, {"role": "assistant", "content": [{"type": "text", "text": assistant_content}]} ] prompt = self.processor.apply_chat_template( messages, tokenize=False, add_generation_prompt=True, chat_format="chatml" ) # 单独处理图像 image = self.image_processor( images=image, return_tensors="pt" )["pixel_values"].squeeze(0) # 单独处理文本 encoding = self.tokenizer( text=prompt, return_tensors="pt", padding=False, truncation=False ) prompt_tokens = encoding["input_ids"][0].tolist() # 找到结论标签的位置 conclusion_start = self.processor.tokenizer.encode("结论:") # 在prompt中查找"结论:"的位置 start_idx = -1 for i in range(len(prompt_tokens) - len(conclusion_start) + 1): if prompt_tokens[i:i+len(conclusion_start)] == conclusion_start: start_idx = i + len(conclusion_start) break inputs = self.processor( text=prompt, images=image, return_tensors="pt", padding="max_length", truncation=True, max_length=512 ) inputs = {k: v.squeeze(0) for k, v in inputs.items()} # 创建标签张量,只标记结论位置 labels = torch.full_like(inputs["input_ids"], fill_value=-100, dtype=torch.long) if start_idx != -1 and start_idx < len(labels): # 标记整个标签token序列 label_tokens = self.label_token_ids[label] for i, token_id in enumerate(label_tokens): if start_idx + i < len(labels): labels[start_idx + i] = token_id inputs["labels"] = labels return inputs def parse_generated_text(self,text): """解析生成的文本,提取结论标签""" conclusion_match = re.search(r"结论[::]\s*(\S+)", text) if not conclusion_match: return None conclusion = conclusion_match.group(1).strip().rstrip('。.') # 处理多token标签 if conclusion in ["无害", "有害"]: # 任务1标签 return conclusion elif conclusion.isdigit() and 0 <= int(conclusion) <= 4: # 任务2标签 return conclusion # 尝试分词匹配 tokenizer = AutoProcessor.from_pretrained(args.model_id).tokenizer conclusion_tokens = tokenizer.encode(conclusion, add_special_tokens=False) # 与已知标签的token序列匹配 for label, tokens in self.label_token_ids.items(): if conclusion_tokens == tokens: return label return None def compute_metrics(task_name, preds, labels): """计算评估指标""" mask = labels != -100 preds = preds[mask] labels = labels[mask] if task_name == "task_1": # 二分类任务 return { "accuracy": accuracy_score(labels, preds), "f1": f1_score(labels, preds, average="binary"), "precision": precision_score(labels, preds, average="binary"), "recall": recall_score(labels, preds, average="binary") } else: # 多分类任务 report = classification_report(labels, preds, output_dict=True, zero_division=0) return { "accuracy": accuracy_score(labels, preds), "f1_macro": f1_score(labels, preds, average="macro"), "precision_macro": precision_score(labels, preds, average="macro"), "recall_macro": recall_score(labels, preds, average="macro"), "class_report": report } def main(args): os.environ["TOKENIZERS_PARALLELISM"] = "false" # 1. 加载模型和预处理器 print("Loading model and processor...") quantization_config = BitsAndBytesConfig( load_in_4bit=True, bnb_4bit_quant_type="nf4", bnb_4bit_compute_dtype=torch.bfloat16 ) model = AutoModelForCausalLM.from_pretrained( args.model_id, quantization_config=quantization_config, trust_remote_code=True, device_map="auto", bf16=True ) model.generation_config = GenerationConfig.from_pretrained( args.model_id, trust_remote_code=True, chat_format="chatml", max_new_tokens=100, pad_token_id=model.generation_config.eos_token_id ) # 分别初始化文本和图像处理器 tokenizer = AutoTokenizer.from_pretrained( args.model_id, trust_remote_code=True, pad_token='<|endoftext|>' # 显式设置pad_token ) image_processor = AutoImageProcessor.from_pretrained( args.model_id, trust_remote_code=True ) tokenizer.chat_template = """{% for message in messages %} <|im_start|>{{ message['role'] }} {{ message['content'] }} <|im_end|> {% endfor %} {% if add_generation_prompt %} <|im_start|>assistant {% endif %}""" # 设置pad token # 确保pad_token正确设置 if tokenizer.pad_token is None: tokenizer.pad_token = tokenizer.eos_token tokenizer.pad_token_id = tokenizer.eos_token_id # 2. LoRA配置 print("Configuring LoRA...") lora_config = LoraConfig( r=args.lora_rank, lora_alpha=args.lora_alpha, lora_dropout=args.lora_dropout, bias="none", task_type="CAUSAL_LM", target_modules=[ "c_attn", "c_proj", "w1", "w2", "w3", "visual.proj", "visual.image_encoder" ] ) peft_model = get_peft_model(model, lora_config) peft_model.print_trainable_parameters() # 3. 初始化优化器和调度器 optimizer = optim.AdamW( peft_model.parameters(), lr=args.learning_rate, weight_decay=args.weight_decay ) # 4. 训练参数配置 training_args = TrainingArguments( output_dir=os.path.join(args.output_dir, args.task), num_train_epochs=args.epochs, per_device_train_batch_size=args.batch_size, per_device_eval_batch_size=args.eval_batch_size, gradient_accumulation_steps=args.grad_accum_steps, learning_rate=args.learning_rate, weight_decay=args.weight_decay, lr_scheduler_type="cosine", logging_strategy="steps", logging_steps=10, save_strategy="epoch", eval_strategy="epoch", eval_accumulation_steps=1, metric_for_best_model="f1" if args.task == "task_1" else "f1_macro", greater_is_better=True, load_best_model_at_end=True, bf16=True, report_to="none", remove_unused_columns=False, disable_tqdm=False, skip_memory_metrics=True, dataloader_pin_memory=False, ) # 5. 加载数据集 print(f"Loading datasets for {args.task}...") train_dataset = HarmfulMemeDataset( annotation_path=args.train_annotation_path, tokenizer=tokenizer, image_processor=image_processor, task_name=args.task, image_root=args.image_root ) test_dataset = HarmfulMemeDataset( annotation_path=args.test_annotation_path, tokenizer=tokenizer, image_processor=image_processor, task_name=args.task, image_root=args.image_root ) # 创建数据加载器 train_loader = DataLoader( train_dataset, batch_size=args.batch_size, shuffle=True, num_workers=args.num_workers, pin_memory=True ) eval_loader = DataLoader( test_dataset, batch_size=args.eval_batch_size, shuffle=False, num_workers=args.num_workers, pin_memory=True ) # 计算总步数,初始化学习率调度器 total_train_steps = len(train_loader) // args.grad_accum_steps * args.epochs scheduler = get_cosine_schedule_with_warmup( optimizer, num_warmup_steps=args.warmup_steps, num_training_steps=total_train_steps ) # 6. 训练循环 print(f"Starting {args.task} training...") best_metric = -1 for epoch in range(args.epochs): print(f"\n===== Epoch {epoch + 1}/{args.epochs} =====") # 训练阶段 peft_model.train() total_train_loss = 0.0 train_pbar = tqdm(train_loader, desc=f"Training Epoch {epoch + 1}", unit="batch") for step, batch in enumerate(train_pbar): batch = {k: v.to(peft_model.device) for k, v in batch.items()} # 前向传播 outputs = peft_model(**batch) loss = outputs.loss total_train_loss += loss.item() # 梯度累积 loss = loss / args.grad_accum_steps loss.backward() # 参数更新 if (step + 1) % args.grad_accum_steps == 0: optimizer.step() scheduler.step() optimizer.zero_grad() # 更新进度条 train_pbar.set_postfix({"loss": f"{loss.item() * args.grad_accum_steps:.4f}"}) avg_train_loss = total_train_loss / len(train_loader) print(f"Epoch {epoch + 1} 平均训练损失: {avg_train_loss:.4f}") # 评估阶段 peft_model.eval() all_preds = [] all_labels = [] all_generated_texts = [] eval_pbar = tqdm(eval_loader, desc=f"Evaluating Epoch {epoch + 1}", unit="batch") with torch.no_grad(): for batch in eval_pbar: # 获取真实标签 labels = batch["labels"].cpu().numpy() mask = labels != -100 valid_labels = labels[mask].reshape(-1) # 生成文本 inputs = {k: v.to(peft_model.device) for k, v in batch.items() if k != "labels"} pad_token_id = tokenizer.pad_token_id or tokenizer.eos_token_id generated_ids = peft_model.generate( **inputs, generation_config=model.generation_config, pad_token_id=pad_token_id # 使用修正后的值 ) # 解码生成的文本 generated_texts = tokenizer.batch_decode( generated_ids, skip_special_tokens=True, clean_up_tokenization_spaces=True ) # 解析生成的文本获取预测标签 batch_preds = [] for text in generated_texts: conclusion = parse_generated_text(text) # 提取assistant的响应部分 if "<|im_start|>assistant" in text: response = text.split("<|im_start|>assistant")[-1].strip() else: response = text # 解析结论 conclusion = parse_generated_text(response) if conclusion is None: # 无法解析结论,使用默认值 pred_label = 0 if args.task == "task_1" else "0" else: pred_label = conclusion # 转换为数字标签 if args.task == "task_1": # 二分类任务 if "无害" in pred_label: pred_value = 0 elif "有害" in pred_label: pred_value = 1 else: # 无法解析,使用默认值 pred_value = 0 else: # 多分类任务 if pred_label in ["0", "1", "2", "3", "4"]: pred_value = int(pred_label) else: # 无法解析,使用默认值 pred_value = 0 batch_preds.append(pred_value) all_preds.extend(batch_preds) all_labels.extend(valid_labels.tolist()) all_generated_texts.extend(generated_texts) # 计算评估指标 metrics = compute_metrics(args.task, np.array(all_preds), np.array(all_labels)) # 打印评估结果 print("\n评估指标:") print("=" * 50) if args.task == "task_1": print(f"Accuracy: {metrics['accuracy']:.4f}") print(f"F1 Score: {metrics['f1']:.4f}") print(f"Precision: {metrics['precision']:.4f}") print(f"Recall: {metrics['recall']:.4f}") else: print(f"Accuracy: {metrics['accuracy']:.4f}") print(f"Macro F1: {metrics['f1_macro']:.4f}") print(f"Macro Precision: {metrics['precision_macro']:.4f}") print(f"Macro Recall: {metrics['recall_macro']:.4f}") print("\n分类报告:") print(classification_report(all_labels, all_preds, target_names=list(TASK2_LABEL_MAP.values()), zero_division=0)) print("=" * 50) # 保存最佳模型 current_metric = metrics["f1"] if args.task == "task_1" else metrics["f1_macro"] if current_metric > best_metric: best_metric = current_metric save_path = os.path.join(training_args.output_dir, f"best_model_epoch{epoch+1}") print(f"保存最佳模型(指标 {current_metric:.4f})到 {save_path}") peft_model.save_pretrained(save_path) # 保存生成的文本示例 sample_output_path = os.path.join(save_path, "sample_outputs.txt") with open(sample_output_path, "w", encoding="utf-8") as f: for i, text in enumerate(all_generated_texts[:10]): f.write(f"样本 {i+1}:\n") f.write(text) f.write("\n" + "-"*80 + "\n") print(f"训练完成!最佳指标: {best_metric:.4f}") if __name__ == "__main__": parser = argparse.ArgumentParser(description="训练有害模因检测模型") parser.add_argument("--model_id", default="/xzwu/Qwen-VL-Chat", help="预训练模型路径") parser.add_argument("--output_dir", default="/xzwu/explain-m3-adapter", help="输出目录") parser.add_argument("--epochs", type=int, default=5, help="训练轮数") parser.add_argument("--batch_size", type=int, default=4, help="训练批次大小") parser.add_argument("--eval_batch_size", type=int, default=4, help="评估批次大小") parser.add_argument("--grad_accum_steps", type=int, default=2, help="梯度累积步数") parser.add_argument("--learning_rate", type=float, default=1e-5, help="学习率") parser.add_argument("--weight_decay", type=float, default=0.01, help="权重衰减") parser.add_argument("--warmup_steps", type=int, default=100, help="预热步数") parser.add_argument("--lora_rank", type=int, default=8, help="LoRA秩") parser.add_argument("--lora_alpha", type=int, default=16, help="LoRA alpha") parser.add_argument("--lora_dropout", type=float, default=0.1, help="LoRA dropout") parser.add_argument("--num_workers", type=int, default=4, help="数据加载工作线程数") parser.add_argument("--task", choices=["task_1", "task_2"], default="task_1", help="任务类型") parser.add_argument("--train_annotation_path", default="/xzwu/data/data/train_data_explanation.json", help="训练标注路径") parser.add_argument("--test_annotation_path", default="/xzwu/data/data/test_data_explanation.json", help="测试标注路径") parser.add_argument("--image_root", default="/xzwu/data/meme", help="图片根目录") args = parser.parse_args() # 打印配置 print("=" * 50) print("训练配置:") for arg in vars(args): print(f"{arg}: {getattr(args, arg)}") print("=" * 50) main(args)运行以上代码报错:Traceback (most recent call last): File "/xzwu/explain-m3/explain-m3-project/train2.py", line 532, in <module> main(args) File "/xzwu/explain-m3/explain-m3-project/train2.py", line 424, in main conclusion = parse_generated_text(text) TypeError: parse_generated_text() missing 1 required positional argument: 'text'

filetype

86 bert_history = bert_model.fit( 87 train_ds, 88 validation_data=test_ds, 89 epochs=3, 90 verbose=1 报错:InvalidArgumentError: Graph execution error: Detected at node 'tf_bert_for_sequence_classification/bert/embeddings/assert_less/Assert/Assert' defined at (most recent call last): File "D:\Anaconda\envs\pytorch1\lib\runpy.py", line 192, in _run_module_as_main return _run_code(code, main_globals, None, File "D:\Anaconda\envs\pytorch1\lib\runpy.py", line 85, in _run_code exec(code, run_globals) File "D:\Anaconda\envs\pytorch1\lib\site-packages\ipykernel_launcher.py", line 17, in <module> app.launch_new_instance() File "D:\Anaconda\envs\pytorch1\lib\site-packages\traitlets\config\application.py", line 1075, in launch_instance app.start() File "D:\Anaconda\envs\pytorch1\lib\site-packages\ipykernel\kernelapp.py", line 701, in start self.io_loop.start() File "D:\Anaconda\envs\pytorch1\lib\site-packages\tornado\platform\asyncio.py", line 205, in start self.asyncio_loop.run_forever() File "D:\Anaconda\envs\pytorch1\lib\asyncio\windows_events.py", line 316, in run_forever super().run_forever() File "D:\Anaconda\envs\pytorch1\lib\asyncio\base_events.py", line 563, in run_forever self._run_once() File "D:\Anaconda\envs\pytorch1\lib\asyncio\base_events.py", line 1844, in _run_once handle._run() File "D:\Anaconda\envs\pytorch1\lib\asyncio\events.py", line 81, in _run self._context.run(self._callback, *self._args) File "D:\Anaconda\envs\pytorch1\lib\site-packages\ipykernel\kernelbase.py", line 534, in dispatch_queue await self.process_one() File "D:\Anaconda\envs\pytorch1\lib\site-packages\ipykernel\kernelbase.py", line 523, in process_one await dispatch(*args) File "D:\Anaconda\envs\pytorch1\lib\site-packages\ipykernel\kernelbase.py", line 429, in dispatch_shell await result File "D:\Anaconda\envs\pytorch1\lib\site-packages\ipykernel\kernelbase.py", line 767, in execute_request reply_content = await reply_content File "D:\Anaconda\envs\pytorch1\lib\site-packages\ipykernel\ipkernel.py", line 429, in do_execute res = shell.run_cell( File "D:\Anaconda\envs\pytorch1\lib\site-packages\ipykernel\zmqshell.py", line 549, in run_cell return super().run_cell(*args, **kwargs) File "D:\Anaconda\envs\pytorch1\lib\site-packages\IPython\core\interactiveshell.py", line 3009, in run_cell result = self._run_cell( File "D:\Anaconda\envs\pytorch1\lib\site-packages\IPython\core\interactiveshell.py", line 3064, in _run_cell result = runner(coro) File "D:\Anaconda\envs\pytorch1\lib\site-packages\IPython\core\async_helpers.py", line 129, in _pseudo_sync_runner coro.send(None) File "D:\Anaconda\envs\pytorch1\lib\site-packages\IPython\core\interactiveshell.py", line 3269, in run_cell_async has_raised = await self.run_ast_nodes(code_ast.body, cell_name, File "D:\Anaconda\envs\pytorch1\lib\site-packages\IPython\core\interactiveshell.py", line 3448, in run_ast_nodes if await self.run_code(code, result, async_=asy): File "D:\Anaconda\envs\pytorch1\lib\site-packages\IPython\core\interactiveshell.py", line 3508, in run_code exec(code_obj, self.user_global_ns, self.user_ns) File "C:\Users\豆崽\AppData\Local\Temp\ipykernel_20320\247762855.py", line 86, in <module> bert_history = bert_model.fit( File "D:\Anaconda\envs\pytorch1\lib\site-packages\transformers\modeling_tf_utils.py", line 1229, in fit return super().fit(*args, **kwargs) File "D:\Anaconda\envs\pytorch1\lib\site-packages\keras\src\utils\traceback_utils.py", line 65, in error_handler return fn(*args, **kwargs) File "D:\Anaconda\envs\pytorch1\lib\site-packages\keras\src\engine\training.py", line 1742, in fit tmp_logs = self.train_function(iterator) File "D:\Anaconda\envs\pytorch1\lib\site-packages\keras\src\engine\training.py", line 1338, in train_function return step_function(self, iterator) File "D:\Anaconda\envs\pytorch1\lib\site-packages\keras\src\engine\training.py", line 1322, in step_function outputs = model.distribute_strategy.run(run_step, args=(data,)) File "D:\Anaconda\envs\pytorch1\lib\site-packages\keras\src\engine\training.py", line 1303, in run_step outputs = model.train_step(data) File "D:\Anaconda\envs\pytorch1\lib\site-packages\transformers\modeling_tf_utils.py", line 1672, in train_step y_pred = self(x, training=True) File "D:\Anaconda\envs\pytorch1\lib\site-packages\keras\src\utils\traceback_utils.py", line 65, in error_handler return fn(*args, **kwargs) File "D:\Anaconda\envs\pytorch1\lib\site-packages\keras\src\engine\training.py", line 569, in __call__ return super().__call__(*args, **kwargs) File "D:\Anaconda\envs\pytorch1\lib\site-packages\keras\src\utils\traceback_utils.py", line 65, in error_handler return fn(*args, **kwargs) File "D:\Anaconda\envs\pytorch1\lib\site-packages\keras\src\engine\base_layer.py", line 1150, in __call__ outputs = call_fn(inputs, *args, **kwargs) File "D:\Anaconda\envs\pytorch1\lib\site-packages\keras\src\utils\traceback_utils.py", line 96, in error_handler return fn(*args, **kwargs) File "D:\Anaconda\envs\pytorch1\lib\site-packages\transformers\modeling_tf_utils.py", line 1734, in run_call_with_unpacked_inputs if not self._using_dummy_loss and parse(tf.__version__) < parse("2.11.0"): File "D:\Anaconda\envs\pytorch1\lib\site-packages\transformers\models\bert\modeling_tf_bert.py", line 1746, in call outputs = self.bert( File "D:\Anaconda\envs\pytorch1\lib\site-packages\keras\src\utils\traceback_utils.py", line 65, in error_handler return fn(*args, **kwargs) File "D:\Anaconda\envs\pytorch1\lib\site-packages\keras\src\engine\base_layer.py", line 1150, in __call__ outputs = call_fn(inputs, *args, **kwargs) File "D:\Anaconda\envs\pytorch1\lib\site-packages\keras\src\utils\traceback_utils.py", line 96, in error_handler return fn(*args, **kwargs) File "D:\Anaconda\envs\pytorch1\lib\site-packages\transformers\modeling_tf_utils.py", line 1734, in run_call_with_unpacked_inputs if not self._using_dummy_loss and parse(tf.__version__) < parse("2.11.0"): File "D:\Anaconda\envs\pytorch1\lib\site-packages\transformers\models\bert\modeling_tf_bert.py", line 887, in call embedding_output = self.embeddings( File "D:\Anaconda\envs\pytorch1\lib\site-packages\keras\src\utils\traceback_utils.py", line 65, in error_handler return fn(*args, **kwargs) File "D:\Anaconda\envs\pytorch1\lib\site-packages\keras\src\engine\base_layer.py", line 1150, in __call__ outputs = call_fn(inputs, *args, **kwargs) File "D:\Anaconda\envs\pytorch1\lib\site-packages\keras\src\utils\traceback_utils.py", line 96, in error_handler return fn(*args, **kwargs) File "D:\Anaconda\envs\pytorch1\lib\site-packages\transformers\models\bert\modeling_tf_bert.py", line 180, in call if input_ids is not None: File "D:\Anaconda\envs\pytorch1\lib\site-packages\transformers\models\bert\modeling_tf_bert.py", line 181, in call check_embeddings_within_bounds(input_ids, self.config.vocab_size) File "D:\Anaconda\envs\pytorch1\lib\site-packages\transformers\tf_utils.py", line 190, in check_embeddings_within_bounds tf.debugging.assert_less( Node: 'tf_bert_for_sequence_classification/bert/embeddings/assert_less/Assert/Assert' assertion failed: [The maximum value of input_ids (Tensor(\"tf_bert_for_sequence_classification/bert/embeddings/Max:0\", shape=(), dtype=int32)) must be smaller than the embedding layer\'s input dimension (30522). The likely cause is some problem at tokenization time.] [Condition x < y did not hold element-wise:] [x (IteratorGetNext:1) = ] [[101 2746 14667...]...] [y (tf_bert_for_sequence_classification/bert/embeddings/Cast/x:0) = ] [30522] [[{{node tf_bert_for_sequence_classification/bert/embeddings/assert_less/Assert/Assert}}]] [Op:__inference_train_function_73426]

filetype

检查并优化代码: import sys import os import json import time import wave import numpy as np import pandas as pd import matplotlib.pyplot as plt import soundfile as sf from matplotlib.backends.backend_qt5agg import FigureCanvasQTAgg as FigureCanvas from PyQt5.QtWidgets import (QApplication, QMainWindow, QWidget, QVBoxLayout, QHBoxLayout, QPushButton, QLabel, QLineEdit, QTextEdit, QFileDialog, QProgressBar, QGroupBox, QComboBox, QCheckBox) from PyQt5.QtCore import QThread, pyqtSignal from pydub import AudioSegment from transformers import pipeline, AutoTokenizer, AutoModelForSequenceClassification import whisper from pyannote.audio import Pipeline from docx import Document from docx.shared import Inches import librosa import tempfile from collections import defaultdict import re class AnalysisThread(QThread): progress = pyqtSignal(int) message = pyqtSignal(str) analysis_complete = pyqtSignal(dict) error = pyqtSignal(str) def __init__(self, audio_files, keyword_file, whisper_model_path, pyannote_model_path, emotion_model_path): super().__init__() self.audio_files = audio_files self.keyword_file = keyword_file self.whisper_model_path = whisper_model_path self.pyannote_model_path = pyannote_model_path self.emotion_model_path = emotion_model_path self.running = True self.cached_models = {} def run(self): try: # 加载关键词 self.message.emit("正在加载关键词...") keywords = self.load_keywords() # 预加载模型 self.message.emit("正在预加载模型...") self.preload_models() results = [] total_files = len(self.audio_files) for idx, audio_file in enumerate(self.audio_files): if not self.running: self.message.emit("分析已停止") return self.message.emit(f"正在处理文件: {os.path.basename(audio_file)} ({idx + 1}/{total_files})") file_result = self.analyze_file(audio_file, keywords) if file_result: results.append(file_result) self.progress.emit(int((idx + 1) / total_files * 100)) self.analysis_complete.emit({"results": results, "keywords": keywords}) self.message.emit("分析完成!") except Exception as e: import traceback self.error.emit(f"分析过程中发生错误: {str(e)}\n{traceback.format_exc()}") def preload_models(self): """预加载所有模型到缓存""" # 检查是否已加载模型 if hasattr(self, 'cached_models') and self.cached_models: return self.cached_models = {} # 加载语音识别模型 if 'whisper' not in self.cached_models: self.message.emit("正在加载语音识别模型...") self.cached_models['whisper'] = whisper.load_model(self.whisper_model_path) # 加载说话人分离模型 if 'pyannote' not in self.cached_models: self.message.emit("正在加载说话人分离模型...") self.cached_models['pyannote'] = Pipeline.from_pretrained(self.pyannote_model_path) # 加载情感分析模型 if 'emotion_classifier' not in self.cached_models: self.message.emit("正在加载情感分析模型...") tokenizer = AutoTokenizer.from_pretrained(self.emotion_model_path) model = AutoModelForSequenceClassification.from_pretrained(self.emotion_model_path) self.cached_models['emotion_classifier'] = pipeline( "text-classification", model=model, tokenizer=tokenizer, device=0 if torch.cuda.is_available() else -1 # 使用GPU如果可用 ) def analyze_file(self, audio_file, keywords): """分析单个音频文件""" try: # 确保音频为WAV格式 wav_file = self.convert_to_wav(audio_file) # 获取音频信息 duration, sample_rate, channels = self.get_audio_info(wav_file) # 说话人分离 diarization = self.cached_models['pyannote'](wav_file) # 识别客服和客户(使用改进的方法) agent_segments, customer_segments = self.identify_speakers(wav_file, diarization, keywords['opening']) # 语音识别(使用优化后的方法) whisper_model = self.cached_models['whisper'] agent_text = self.transcribe_audio(wav_file, agent_segments, whisper_model) customer_text = self.transcribe_audio(wav_file, customer_segments, whisper_model) # 情感分析 emotion_classifier = self.cached_models['emotion_classifier'] agent_emotion = self.analyze_emotion(agent_text, emotion_classifier) customer_emotion = self.analyze_emotion(customer_text, emotion_classifier) # 服务规范检查 opening_check = self.check_opening(agent_text, keywords['opening']) closing_check = self.check_closing(agent_text, keywords['closing']) forbidden_check = self.check_forbidden(agent_text, keywords['forbidden']) # 沟通技巧分析(使用改进的方法) speech_rate = self.analyze_speech_rate(wav_file, agent_segments) volume_analysis = self.analyze_volume(wav_file, agent_segments) # 问题解决率分析 resolution_rate = self.analyze_resolution(agent_text, customer_text, keywords['resolution']) # 构建结果 return { "file_name": os.path.basename(audio_file), "duration": duration, "agent_text": agent_text, "customer_text": customer_text, "opening_check": opening_check, "closing_check": closing_check, "forbidden_check": forbidden_check, "agent_emotion": agent_emotion, "customer_emotion": customer_emotion, "speech_rate": speech_rate, "volume_mean": volume_analysis['mean'], "volume_std": volume_analysis['std'], "resolution_rate": resolution_rate } except Exception as e: self.error.emit(f"处理文件 {os.path.basename(audio_file)} 时出错: {str(e)}") return None def load_keywords(self): """从Excel文件加载关键词""" try: df = pd.read_excel(self.keyword_file) keywords = { "opening": [str(k).strip() for k in df['opening'].dropna().tolist()], "closing": [str(k).strip() for k in df['closing'].dropna().tolist()], "forbidden": [str(k).strip() for k in df['forbidden'].dropna().tolist()], "resolution": [str(k).strip() for k in df['resolution'].dropna().tolist()] } return keywords except Exception as e: raise Exception(f"加载关键词文件失败: {str(e)}") def convert_to_wav(self, audio_file): """将音频文件转换为WAV格式(如果需要)""" try: if not audio_file.lower().endswith('.wav'): # 使用临时文件避免磁盘IO with tempfile.NamedTemporaryFile(suffix='.wav', delete=False) as tmpfile: output_file = tmpfile.name audio = AudioSegment.from_file(audio_file) audio.export(output_file, format='wav') return output_file return audio_file except Exception as e: raise Exception(f"音频转换失败: {str(e)}") def get_audio_info(self, wav_file): """获取音频文件信息""" try: with wave.open(wav_file, 'rb') as wf: frames = wf.getnframes() rate = wf.getframerate() channels = wf.getnchannels() duration = frames / float(rate) return duration, rate, channels except Exception as e: raise Exception(f"获取音频信息失败: {str(e)}") def identify_speakers(self, wav_file, diarization, opening_keywords): """改进的客服识别方法 - 检查前三个片段是否有开场白关键词""" speaker_segments = defaultdict(list) for segment, _, speaker in diarization.itertracks(yield_label=True): speaker_segments[speaker].append((segment.start, segment.end)) # 如果没有说话人 if not speaker_segments: return [], [] # 如果只有一个说话人 if len(speaker_segments) == 1: speaker = list(speaker_segments.keys())[0] return speaker_segments[speaker], [] # 检查每个说话人的前三个片段是否有开场白 speaker_scores = {} whisper_model = self.cached_models['whisper'] for speaker, segments in speaker_segments.items(): score = 0 # 取前三个片段(或所有片段如果少于3个) check_segments = segments[:3] for start, end in check_segments: # 转录片段 text = self.transcribe_audio_segment(wav_file, [(start, end)], whisper_model) # 检查开场白关键词 for keyword in opening_keywords: if keyword and keyword in text: score += 1 break speaker_scores[speaker] = score # 找到得分最高的说话人作为客服 agent_speaker = max(speaker_scores, key=speaker_scores.get) agent_segments = [] customer_segments = [] for speaker, segments in speaker_segments.items(): if speaker == agent_speaker: agent_segments = segments else: customer_segments.extend(segments) return agent_segments, customer_segments def transcribe_audio_segment(self, wav_file, segments, model): """转录单个音频片段 - 用于客服识别""" if not segments: return "" # 使用pydub加载音频 audio = AudioSegment.from_wav(wav_file) start, end = segments[0] # 转换为毫秒 start_ms = int(start * 1000) end_ms = int(end * 1000) segment_audio = audio[start_ms:end_ms] # 使用临时文件 with tempfile.NamedTemporaryFile(suffix='.wav') as tmpfile: segment_audio.export(tmpfile.name, format="wav") result = model.transcribe(tmpfile.name) return result['text'] def transcribe_audio(self, wav_file, segments, model): """优化后的转录方法 - 按片段转录""" if not segments: return "" # 使用pydub加载音频 audio = AudioSegment.from_wav(wav_file) full_text = "" # 只处理指定片段 for start, end in segments: # 转换为毫秒 start_ms = int(start * 1000) end_ms = int(end * 1000) segment_audio = audio[start_ms:end_ms] # 使用临时文件避免内存占用 with tempfile.NamedTemporaryFile(suffix='.wav') as tmpfile: segment_audio.export(tmpfile.name, format="wav") result = model.transcribe(tmpfile.name) full_text += result['text'] + " " return full_text.strip() def analyze_emotion(self, text, classifier): """分析文本情感""" if not text.strip(): return {"label": "中性", "score": 0.0} # 截断长文本以提高性能 if len(text) > 500: text = text[:500] result = classifier(text, truncation=True, max_length=512) return { "label": result[0]['label'], "score": result[0]['score'] } def check_opening(self, text, opening_keywords): """检查开场白""" return any(keyword in text for keyword in opening_keywords if keyword) def check_closing(self, text, closing_keywords): """检查结束语""" return any(keyword in text for keyword in closing_keywords if keyword) def check_forbidden(self, text, forbidden_keywords): """检查服务禁语""" return any(keyword in text for keyword in forbidden_keywords if keyword) def analyze_speech_rate(self, wav_file, segments): """改进的语速分析 - 基于实际识别文本""" if not segments: return 0 # 加载音频 y, sr = librosa.load(wav_file, sr=None) total_chars = 0 total_duration = 0 whisper_model = self.cached_models['whisper'] for start, end in segments: # 计算片段时长(秒) duration = end - start total_duration += duration # 转录片段 text = self.transcribe_audio_segment(wav_file, [(start, end)], whisper_model) # 计算中文字符数(去除标点和空格) chinese_chars = sum(1 for char in text if '\u4e00' <= char <= '\u9fff') total_chars += chinese_chars if total_duration == 0: return 0 # 语速 = 总字数 / 总时长(分钟) return total_chars / (total_duration / 60) def analyze_volume(self, wav_file, segments): """改进的音量分析 - 使用librosa计算RMS分贝值""" if not segments: return {"mean": -60, "std": 0} # 加载音频 y, sr = librosa.load(wav_file, sr=None) all_dB = [] for start, end in segments: start_sample = int(start * sr) end_sample = int(end * sr) segment_audio = y[start_sample:end_sample] # 计算RMS并转换为dB rms = librosa.feature.rms(y=segment_audio)[0] dB = librosa.amplitude_to_db(rms, ref=np.max) all_dB.extend(dB) if not all_dB: return {"mean": -60, "std": 0} return { "mean": float(np.mean(all_dB)), "std": float(np.std(all_dB)) } def analyze_resolution(self, agent_text, customer_text, resolution_keywords): """分析问题解决率""" return any(keyword in agent_text for keyword in resolution_keywords if keyword) def stop(self): """停止分析""" self.running = False class MainWindow(QMainWindow): def __init__(self): super().__init__() self.setWindowTitle("外呼电话录音包质检分析系统") self.setGeometry(100, 100, 1000, 700) # 初始化变量 self.audio_files = [] self.keyword_file = "" self.whisper_model_path = "./models/whisper-small" self.pyannote_model_path = "./models/pyannote-speaker-diarization" self.emotion_model_path = "./models/Erlangshen-Roberta-110M-Sentiment" self.output_dir = "./reports" # 创建主控件 central_widget = QWidget() self.setCentralWidget(central_widget) main_layout = QVBoxLayout(central_widget) # 文件选择区域 file_group = QGroupBox("文件选择") file_layout = QVBoxLayout(file_group) # 音频文件选择 audio_layout = QHBoxLayout() self.audio_label = QLabel("音频文件/文件夹:") audio_layout.addWidget(self.audio_label) self.audio_path_edit = QLineEdit() audio_layout.addWidget(self.audio_path_edit) self.audio_browse_btn = QPushButton("浏览...") self.audio_browse_btn.clicked.connect(self.browse_audio) audio_layout.addWidget(self.audio_browse_btn) file_layout.addLayout(audio_layout) # 关键词文件选择 keyword_layout = QHBoxLayout() self.keyword_label = QLabel("关键词文件:") keyword_layout.addWidget(self.keyword_label) self.keyword_path_edit = QLineEdit() keyword_layout.addWidget(self.keyword_path_edit) self.keyword_browse_btn = QPushButton("浏览...") self.keyword_browse_btn.clicked.connect(self.browse_keyword) keyword_layout.addWidget(self.keyword_browse_btn) file_layout.addLayout(keyword_layout) main_layout.addWidget(file_group) # 模型设置区域 model_group = QGroupBox("模型设置") model_layout = QVBoxLayout(model_group) # Whisper模型路径 whisper_layout = QHBoxLayout() whisper_layout.addWidget(QLabel("Whisper模型路径:")) self.whisper_edit = QLineEdit(self.whisper_model_path) whisper_layout.addWidget(self.whisper_edit) model_layout.addLayout(whisper_layout) # Pyannote模型路径 pyannote_layout = QHBoxLayout() pyannote_layout.addWidget(QLabel("Pyannote模型路径:")) self.pyannote_edit = QLineEdit(self.pyannote_model_path) pyannote_layout.addWidget(self.pyannote_edit) model_layout.addLayout(pyannote_layout) # 情感分析模型路径 emotion_layout = QHBoxLayout() emotion_layout.addWidget(QLabel("情感分析模型路径:")) self.emotion_edit = QLineEdit(self.emotion_model_path) emotion_layout.addWidget(self.emotion_edit) model_layout.addLayout(emotion_layout) # 输出目录 output_layout = QHBoxLayout() output_layout.addWidget(QLabel("输出目录:")) self.output_edit = QLineEdit(self.output_dir) output_layout.addWidget(self.output_edit) self.output_browse_btn = QPushButton("浏览...") self.output_browse_btn.clicked.connect(self.browse_output) output_layout.addWidget(self.output_browse_btn) model_layout.addLayout(output_layout) main_layout.addWidget(model_group) # 控制按钮区域 control_layout = QHBoxLayout() self.start_btn = QPushButton("开始分析") self.start_btn.clicked.connect(self.start_analysis) control_layout.addWidget(self.start_btn) self.stop_btn = QPushButton("停止分析") self.stop_btn.clicked.connect(self.stop_analysis) self.stop_btn.setEnabled(False) control_layout.addWidget(self.stop_btn) self.clear_btn = QPushButton("清空") self.clear_btn.clicked.connect(self.clear_all) control_layout.addWidget(self.clear_btn) main_layout.addLayout(control_layout) # 进度条 self.progress_bar = QProgressBar() self.progress_bar.setValue(0) main_layout.addWidget(self.progress_bar) # 日志输出区域 log_group = QGroupBox("分析日志") log_layout = QVBoxLayout(log_group) self.log_text = QTextEdit() self.log_text.setReadOnly(True) log_layout.addWidget(self.log_text) main_layout.addWidget(log_group) # 状态区域 status_layout = QHBoxLayout() self.status_label = QLabel("就绪") status_layout.addWidget(self.status_label) self.file_count_label = QLabel("已选择0个音频文件") status_layout.addWidget(self.file_count_label) main_layout.addLayout(status_layout) # 初始化分析线程 self.analysis_thread = None def browse_audio(self): """浏览音频文件或文件夹""" options = QFileDialog.Options() files, _ = QFileDialog.getOpenFileNames( self, "选择音频文件", "", "音频文件 (*.mp3 *.wav *.amr *.ogg *.flac);;所有文件 (*)", options=options ) if files: self.audio_files = files self.audio_path_edit.setText("; ".join(files)) self.file_count_label.setText(f"已选择{len(files)}个音频文件") self.log_text.append(f"已选择{len(files)}个音频文件") def browse_keyword(self): """浏览关键词文件""" options = QFileDialog.Options() file, _ = QFileDialog.getOpenFileName( self, "选择关键词文件", "", "Excel文件 (*.xlsx *.xls);;所有文件 (*)", options=options ) if file: self.keyword_file = file self.keyword_path_edit.setText(file) self.log_text.append(f"已选择关键词文件: {file}") def browse_output(self): """浏览输出目录""" options = QFileDialog.Options() directory = QFileDialog.getExistingDirectory( self, "选择输出目录", "", options=options ) if directory: self.output_dir = directory self.output_edit.setText(directory) self.log_text.append(f"输出目录设置为: {directory}") def start_analysis(self): """开始分析""" if not self.audio_files: self.log_text.append("错误: 请先选择音频文件") return if not self.keyword_file: self.log_text.append("错误: 请先选择关键词文件") return # 更新模型路径 self.whisper_model_path = self.whisper_edit.text() self.pyannote_model_path = self.pyannote_edit.text() self.emotion_model_path = self.emotion_edit.text() self.output_dir = self.output_edit.text() # 创建输出目录 os.makedirs(self.output_dir, exist_ok=True) self.log_text.append("开始分析...") self.start_btn.setEnabled(False) self.stop_btn.setEnabled(True) self.status_label.setText("分析中...") self.progress_bar.setValue(0) # 创建并启动分析线程 self.analysis_thread = AnalysisThread( self.audio_files, self.keyword_file, self.whisper_model_path, self.pyannote_model_path, self.emotion_model_path ) self.analysis_thread.progress.connect(self.progress_bar.setValue) self.analysis_thread.message.connect(self.log_text.append) self.analysis_thread.analysis_complete.connect(self.on_analysis_complete) self.analysis_thread.error.connect(self.on_analysis_error) self.analysis_thread.finished.connect(self.on_analysis_finished) self.analysis_thread.start() def stop_analysis(self): """停止分析""" if self.analysis_thread and self.analysis_thread.isRunning(): self.analysis_thread.stop() self.log_text.append("正在停止分析...") self.stop_btn.setEnabled(False) def clear_all(self): """清空所有内容""" self.audio_files = [] self.keyword_file = "" self.audio_path_edit.clear() self.keyword_path_edit.clear() self.log_text.clear() self.progress_bar.setValue(0) self.status_label.setText("就绪") self.file_count_label.setText("已选择0个音频文件") self.log_text.append("已清空所有内容") def on_analysis_complete(self, result): """分析完成处理""" try: self.log_text.append("正在生成报告...") if not result.get("results"): self.log_text.append("警告: 没有生成任何分析结果") return # 生成Excel报告 excel_path = os.path.join(self.output_dir, "质检分析报告.xlsx") self.generate_excel_report(result, excel_path) # 生成Word报告 word_path = os.path.join(self.output_dir, "质检分析报告.docx") self.generate_word_report(result, word_path) self.log_text.append(f"分析报告已保存至: {excel_path}") self.log_text.append(f"可视化报告已保存至: {word_path}") self.log_text.append("分析完成!") self.status_label.setText(f"分析完成!报告保存至: {self.output_dir}") except Exception as e: import traceback self.log_text.append(f"生成报告时出错: {str(e)}\n{traceback.format_exc()}") def on_analysis_error(self, message): """分析错误处理""" self.log_text.append(f"错误: {message}") self.status_label.setText("发生错误") def on_analysis_finished(self): """分析线程结束处理""" self.start_btn.setEnabled(True) self.stop_btn.setEnabled(False) def generate_excel_report(self, result, output_path): """生成Excel报告""" # 从结果中提取数据 data = [] for res in result['results']: data.append({ "文件名": res['file_name'], "音频时长(秒)": res['duration'], "开场白检查": "通过" if res['opening_check'] else "未通过", "结束语检查": "通过" if res['closing_check'] else "未通过", "服务禁语检查": "通过" if not res['forbidden_check'] else "未通过", "客服情感": res['agent_emotion']['label'], "客服情感得分": res['agent_emotion']['score'], "客户情感": res['customer_emotion']['label'], "客户情感得分": res['customer_emotion']['score'], "语速(字/分)": res['speech_rate'], "平均音量(dB)": res['volume_mean'], "音量标准差": res['volume_std'], "问题解决率": "是" if res['resolution_rate'] else "否" }) # 创建DataFrame并保存 df = pd.DataFrame(data) df.to_excel(output_path, index=False) # 添加汇总统计 try: with pd.ExcelWriter(output_path, engine='openpyxl', mode='a', if_sheet_exists='replace') as writer: summary_data = { "统计项": ["总文件数", "开场白通过率", "结束语通过率", "服务禁语通过率", "问题解决率"], "数值": [ len(result['results']), df['开场白检查'].value_counts().get('通过', 0) / len(df), df['结束语检查'].value_counts().get('通过', 0) / len(df), df['服务禁语检查'].value_counts().get('通过', 0) / len(df), df['问题解决率'].value_counts().get('是', 0) / len(df) ] } summary_df = pd.DataFrame(summary_data) summary_df.to_excel(writer, sheet_name='汇总统计', index=False) except Exception as e: self.log_text.append(f"添加汇总统计时出错: {str(e)}") def generate_word_report(self, result, output_path): """生成Word报告""" doc = Document() # 添加标题 doc.add_heading('外呼电话录音质检分析报告', 0) # 添加基本信息 doc.add_heading('分析概况', level=1) doc.add_paragraph(f"分析时间: {time.strftime('%Y-%m-%d %H:%M:%S')}") doc.add_paragraph(f"分析文件数量: {len(result['results'])}") doc.add_paragraph(f"关键词文件: {os.path.basename(self.keyword_file)}") # 添加汇总统计 doc.add_heading('汇总统计', level=1) # 创建汇总表格 table = doc.add_table(rows=5, cols=2) table.style = 'Table Grid' # 表头 hdr_cells = table.rows[0].cells hdr_cells[0].text = '统计项' hdr_cells[1].text = '数值' # 计算统计数据 df = pd.DataFrame(result['results']) pass_rates = { "开场白通过率": df['opening_check'].mean() if not df.empty else 0, "结束语通过率": df['closing_check'].mean() if not df.empty else 0, "服务禁语通过率": (1 - df['forbidden_check']).mean() if not df.empty else 0, "问题解决率": df['resolution_rate'].mean() if not df.empty else 0 } # 填充表格 rows = [ ("总文件数", len(result['results'])), ("开场白通过率", f"{pass_rates['开场白通过率']:.2%}"), ("结束语通过率", f"{pass_rates['结束语通过率']:.2%}"), ("服务禁语通过率", f"{pass_rates['服务禁语通过率']:.2%}"), ("问题解决率", f"{pass_rates['问题解决率']:.2%}") ] for i, row_data in enumerate(rows): if i < len(table.rows): row_cells = table.rows[i].cells row_cells[0].text = row_data[0] row_cells[1].text = str(row_data[1]) # 添加情感分析图表 if result['results']: doc.add_heading('情感分析', level=1) # 客服情感分布 agent_emotions = [res['agent_emotion']['label'] for res in result['results']] agent_emotion_counts = pd.Series(agent_emotions).value_counts() if not agent_emotion_counts.empty: fig, ax = plt.subplots(figsize=(6, 4)) agent_emotion_counts.plot.pie(autopct='%1.1f%%', ax=ax) ax.set_title('客服情感分布') plt.tight_layout() # 保存图表到临时文件 chart_path = os.path.join(self.output_dir, "agent_emotion_chart.png") plt.savefig(chart_path, dpi=100) plt.close() doc.add_picture(chart_path, width=Inches(4)) doc.add_paragraph('图1: 客服情感分布') # 客户情感分布 customer_emotions = [res['customer_emotion']['label'] for res in result['results']] customer_emotion_counts = pd.Series(customer_emotions).value_counts() if not customer_emotion_counts.empty: fig, ax = plt.subplots(figsize=(6, 4)) customer_emotion_counts.plot.pie(autopct='%1.1f%%', ax=ax) ax.set_title('客户情感分布') plt.tight_layout() chart_path = os.path.join(self.output_dir, "customer_emotion_chart.png") plt.savefig(chart_path, dpi=100) plt.close() doc.add_picture(chart_path, width=Inches(4)) doc.add_paragraph('图2: 客户情感分布') # 添加详细分析结果 doc.add_heading('详细分析结果', level=1) # 创建详细表格 table = doc.add_table(rows=1, cols=6) table.style = 'Table Grid' # 表头 hdr_cells = table.rows[0].cells headers = ['文件名', '开场白', '结束语', '禁语', '客服情感', '问题解决'] for i, header in enumerate(headers): hdr_cells[i].text = header # 填充数据 for res in result['results']: row_cells = table.add_row().cells row_cells[0].text = res['file_name'] row_cells[1].text = "✓" if res['opening_check'] else "✗" row_cells[2].text = "✓" if res['closing_check'] else "✗" row_cells[3].text = "✗" if res['forbidden_check'] else "✓" row_cells[4].text = res['agent_emotion']['label'] row_cells[5].text = "✓" if res['resolution_rate'] else "✗" # 保存文档 doc.save(output_path) if __name__ == "__main__": # 检查是否安装了torch try: import torch except ImportError: print("警告: PyTorch 未安装,情感分析可能无法使用GPU加速") app = QApplication(sys.argv) window = MainWindow() window.show() sys.exit(app.exec_())

filetype

检查代码并优化: import sys import os import json import time import wave import numpy as np import pandas as pd import matplotlib.pyplot as plt import soundfile as sf # 新增依赖,用于音频片段保存 from matplotlib.backends.backend_qt5agg import FigureCanvasQTAgg as FigureCanvas from PyQt5.QtWidgets import (QApplication, QMainWindow, QWidget, QVBoxLayout, QHBoxLayout, QPushButton, QLabel, QLineEdit, QTextEdit, QFileDialog, QProgressBar, QGroupBox, QComboBox, QCheckBox) from PyQt5.QtCore import QThread, pyqtSignal from pydub import AudioSegment from transformers import pipeline, AutoTokenizer, AutoModelForSequenceClassification import whisper from pyannote.audio import Pipeline from docx import Document from docx.shared import Inches import librosa import tempfile from collections import defaultdict import re # 新增用于中文字符计数 class AnalysisThread(QThread): progress = pyqtSignal(int) message = pyqtSignal(str) analysis_complete = pyqtSignal(dict) error = pyqtSignal(str) def __init__(self, audio_files, keyword_file, whisper_model_path, pyannote_model_path, emotion_model_path): super().__init__() self.audio_files = audio_files self.keyword_file = keyword_file self.whisper_model_path = whisper_model_path self.pyannote_model_path = pyannote_model_path self.emotion_model_path = emotion_model_path self.running = True self.cached_models = {} def run(self): try: # 加载关键词 self.message.emit("正在加载关键词...") keywords = self.load_keywords() # 预加载模型 self.message.emit("正在预加载模型...") self.preload_models() results = [] total_files = len(self.audio_files) for idx, audio_file in enumerate(self.audio_files): if not self.running: self.message.emit("分析已停止") return self.message.emit(f"正在处理文件: {os.path.basename(audio_file)} ({idx+1}/{total_files})") file_result = self.analyze_file(audio_file, keywords) if file_result: results.append(file_result) self.progress.emit(int((idx + 1) / total_files * 100)) self.analysis_complete.emit({"results": results, "keywords": keywords}) self.message.emit("分析完成!") except Exception as e: import traceback self.error.emit(f"分析过程中发生错误: {str(e)}\n{traceback.format_exc()}") def preload_models(self): """预加载所有模型到缓存""" # 检查是否已加载模型 if hasattr(self, 'cached_models') and self.cached_models: return self.cached_models = {} # 加载语音识别模型 if 'whisper' not in self.cached_models: self.message.emit("正在加载语音识别模型...") self.cached_models['whisper'] = whisper.load_model(self.whisper_model_path) # 加载说话人分离模型 if 'pyannote' not in self.cached_models: self.message.emit("正在加载说话人分离模型...") self.cached_models['pyannote'] = Pipeline.from_pretrained(self.pyannote_model_path) # 加载情感分析模型 if 'emotion_classifier' not in self.cached_models: self.message.emit("正在加载情感分析模型...") tokenizer = AutoTokenizer.from_pretrained(self.emotion_model_path) model = AutoModelForSequenceClassification.from_pretrained(self.emotion_model_path) self.cached_models['emotion_classifier'] = pipeline( "text-classification", model=model, tokenizer=tokenizer, device=0 if torch.cuda.is_available() else -1 # 使用GPU如果可用 ) def analyze_file(self, audio_file, keywords): """分析单个音频文件""" try: # 确保音频为WAV格式 wav_file = self.convert_to_wav(audio_file) # 获取音频信息 duration, sample_rate, channels = self.get_audio_info(wav_file) # 说话人分离 diarization = self.cached_models['pyannote'](wav_file) # 识别客服和客户(使用改进的方法) agent_segments, customer_segments = self.identify_speakers(wav_file, diarization, keywords['opening']) # 语音识别(使用优化后的方法) whisper_model = self.cached_models['whisper'] agent_text = self.transcribe_audio(wav_file, agent_segments, whisper_model) customer_text = self.transcribe_audio(wav_file, customer_segments, whisper_model) # 情感分析 emotion_classifier = self.cached_models['emotion_classifier'] agent_emotion = self.analyze_emotion(agent_text, emotion_classifier) customer_emotion = self.analyze_emotion(customer_text, emotion_classifier) # 服务规范检查 opening_check = self.check_opening(agent_text, keywords['opening']) closing_check = self.check_closing(agent_text, keywords['closing']) forbidden_check = self.check_forbidden(agent_text, keywords['forbidden']) # 沟通技巧分析(使用改进的方法) speech_rate = self.analyze_speech_rate(wav_file, agent_segments) volume_analysis = self.analyze_volume(wav_file, agent_segments) # 问题解决率分析 resolution_rate = self.analyze_resolution(agent_text, customer_text, keywords['resolution']) # 构建结果 return { "file_name": os.path.basename(audio_file), "duration": duration, "agent_text": agent_text, "customer_text": customer_text, "opening_check": opening_check, "closing_check": closing_check, "forbidden_check": forbidden_check, "agent_emotion": agent_emotion, "customer_emotion": customer_emotion, "speech_rate": speech_rate, "volume_mean": volume_analysis['mean'], "volume_std": volume_analysis['std'], "resolution_rate": resolution_rate } except Exception as e: self.error.emit(f"处理文件 {os.path.basename(audio_file)} 时出错: {str(e)}") return None def load_keywords(self): """从Excel文件加载关键词""" try: df = pd.read_excel(self.keyword_file) keywords = { "opening": [str(k).strip() for k in df['opening'].dropna().tolist()], "closing": [str(k).strip() for k in df['closing'].dropna().tolist()], "forbidden": [str(k).strip() for k in df['forbidden'].dropna().tolist()], "resolution": [str(k).strip() for k in df['resolution'].dropna().tolist()] } return keywords except Exception as e: raise Exception(f"加载关键词文件失败: {str(e)}") def convert_to_wav(self, audio_file): """将音频文件转换为WAV格式(如果需要)""" try: if not audio_file.lower().endswith('.wav'): # 使用临时文件避免磁盘IO with tempfile.NamedTemporaryFile(suffix='.wav', delete=False) as tmpfile: output_file = tmpfile.name audio = AudioSegment.from_file(audio_file) audio.export(output_file, format='wav') return output_file return audio_file except Exception as e: raise Exception(f"音频转换失败: {str(e)}") def get_audio_info(self, wav_file): """获取音频文件信息""" try: with wave.open(wav_file, 'rb') as wf: frames = wf.getnframes() rate = wf.getframerate() channels = wf.getnchannels() duration = frames / float(rate) return duration, rate, channels except Exception as e: raise Exception(f"获取音频信息失败: {str(e)}") def identify_speakers(self, wav_file, diarization, opening_keywords): """改进的客服识别方法 - 检查前三个片段是否有开场白关键词""" speaker_segments = defaultdict(list) for segment, _, speaker in diarization.itertracks(yield_label=True): speaker_segments[speaker].append((segment.start, segment.end)) # 如果没有说话人 if not speaker_segments: return [], [] # 如果只有一个说话人 if len(speaker_segments) == 1: speaker = list(speaker_segments.keys())[0] return speaker_segments[speaker], [] # 检查每个说话人的前三个片段是否有开场白 speaker_scores = {} whisper_model = self.cached_models['whisper'] for speaker, segments in speaker_segments.items(): score = 0 # 取前三个片段(或所有片段如果少于3个) check_segments = segments[:3] for start, end in check_segments: # 转录片段 text = self.transcribe_audio_segment(wav_file, [(start, end)], whisper_model) # 检查开场白关键词 for keyword in opening_keywords: if keyword and keyword in text: score += 1 break speaker_scores[speaker] = score # 找到得分最高的说话人作为客服 agent_speaker = max(speaker_scores, key=speaker_scores.get) agent_segments = [] customer_segments = [] for speaker, segments in speaker_segments.items(): if speaker == agent_speaker: agent_segments = segments else: customer_segments.extend(segments) return agent_segments, customer_segments def transcribe_audio_segment(self, wav_file, segments, model): """转录单个音频片段 - 用于客服识别""" if not segments: return "" # 使用pydub加载音频 audio = AudioSegment.from_wav(wav_file) start, end = segments[0] # 转换为毫秒 start_ms = int(start * 1000) end_ms = int(end * 1000) segment_audio = audio[start_ms:end_ms] # 使用临时文件 with tempfile.NamedTemporaryFile(suffix='.wav') as tmpfile: segment_audio.export(tmpfile.name, format="wav") result = model.transcribe(tmpfile.name) return result['text'] def transcribe_audio(self, wav_file, segments, model): """优化后的转录方法 - 按片段转录""" if not segments: return "" # 使用pydub加载音频 audio = AudioSegment.from_wav(wav_file) full_text = "" # 只处理指定片段 for start, end in segments: # 转换为毫秒 start_ms = int(start * 1000) end_ms = int(end * 1000) segment_audio = audio[start_ms:end_ms] # 使用临时文件避免内存占用 with tempfile.NamedTemporaryFile(suffix='.wav') as tmpfile: segment_audio.export(tmpfile.name, format="wav") result = model.transcribe(tmpfile.name) full_text += result['text'] + " " return full_text.strip() def analyze_emotion(self, text, classifier): """分析文本情感""" if not text.strip(): return {"label": "中性", "score": 0.0} # 截断长文本以提高性能 if len(text) > 500: text = text[:500] result = classifier(text, truncation=True, max_length=512) return { "label": result[0]['label'], "score": result[0]['score'] } def check_opening(self, text, opening_keywords): """检查开场白""" return any(keyword in text for keyword in opening_keywords if keyword) def check_closing(self, text, closing_keywords): """检查结束语""" return any(keyword in text for keyword in closing_keywords if keyword) def check_forbidden(self, text, forbidden_keywords): """检查服务禁语""" return any(keyword in text for keyword in forbidden_keywords if keyword) def analyze_speech_rate(self, wav_file, segments): """改进的语速分析 - 基于实际识别文本""" if not segments: return 0 # 加载音频 y, sr = librosa.load(wav_file, sr=None) total_chars = 0 total_duration = 0 whisper_model = self.cached_models['whisper'] for start, end in segments: # 计算片段时长(秒) duration = end - start total_duration += duration # 转录片段 text = self.transcribe_audio_segment(wav_file, [(start, end)], whisper_model) # 计算中文字符数(去除标点和空格) chinese_chars = sum(1 for char in text if '\u4e00' <= char <= '\u9fff') total_chars += chinese_chars if total_duration == 0: return 0 # 语速 = 总字数 / 总时长(分钟) return total_chars / (total_duration / 60) def analyze_volume(self, wav_file, segments): """改进的音量分析 - 使用librosa计算RMS分贝值""" if not segments: return {"mean": -60, "std": 0} # 加载音频 y, sr = librosa.load(wav_file, sr=None) all_dB = [] for start, end in segments: start_sample = int(start * sr) end_sample = int(end * sr) segment_audio = y[start_sample:end_sample] # 计算RMS并转换为dB rms = librosa.feature.rms(y=segment_audio)[0] dB = librosa.amplitude_to_db(rms, ref=np.max) all_dB.extend(dB) if not all_dB: return {"mean": -60, "std": 0} return { "mean": float(np.mean(all_dB)), "std": float(np.std(all_dB)) } def analyze_resolution(self, agent_text, customer_text, resolution_keywords): """分析问题解决率""" return any(keyword in agent_text for keyword in resolution_keywords if keyword) def stop(self): """停止分析""" self.running = False class MainWindow(QMainWindow): def __init__(self): super().__init__() self.setWindowTitle("外呼电话录音包质检分析系统") self.setGeometry(100, 100, 1000, 700) # 初始化变量 self.audio_files = [] self.keyword_file = "" self.whisper_model_path = "./models/whisper-small" self.pyannote_model_path = "./models/pyannote-speaker-diarization" self.emotion_model_path = "./models/Erlangshen-Roberta-110M-Sentiment" self.output_dir = "./reports" # 创建主控件 central_widget = QWidget() self.setCentralWidget(central_widget) main_layout = QVBoxLayout(central_widget) # 文件选择区域 file_group = QGroupBox("文件选择") file_layout = QVBoxLayout(file_group) # 音频文件选择 audio_layout = QHBoxLayout() self.audio_label = QLabel("音频文件/文件夹:") audio_layout.addWidget(self.audio_label) self.audio_path_edit = QLineEdit() audio_layout.addWidget(self.audio_path_edit) self.audio_browse_btn = QPushButton("浏览...") self.audio_browse_btn.clicked.connect(self.browse_audio) audio_layout.addWidget(self.audio_browse_btn) file_layout.addLayout(audio_layout) # 关键词文件选择 keyword_layout = QHBoxLayout() self.keyword_label = QLabel("关键词文件:") keyword_layout.addWidget(self.keyword_label) self.keyword_path_edit = QLineEdit() keyword_layout.addWidget(self.keyword_path_edit) self.keyword_browse_btn = QPushButton("浏览...") self.keyword_browse_btn.clicked.connect(self.browse_keyword) keyword_layout.addWidget(self.keyword_browse_btn) file_layout.addLayout(keyword_layout) main_layout.addWidget(file_group) # 模型设置区域 model_group = QGroupBox("模型设置") model_layout = QVBoxLayout(model_group) # Whisper模型路径 whisper_layout = QHBoxLayout() whisper_layout.addWidget(QLabel("Whisper模型路径:")) self.whisper_edit = QLineEdit(self.whisper_model_path) whisper_layout.addWidget(self.whisper_edit) model_layout.addLayout(whisper_layout) # Pyannote模型路径 pyannote_layout = QHBoxLayout() pyannote_layout.addWidget(QLabel("Pyannote模型路径:")) self.pyannote_edit = QLineEdit(self.pyannote_model_path) pyannote_layout.addWidget(self.pyannote_edit) model_layout.addLayout(pyannote_layout) # 情感分析模型路径 emotion_layout = QHBoxLayout() emotion_layout.addWidget(QLabel("情感分析模型路径:")) self.emotion_edit = QLineEdit(self.emotion_model_path) emotion_layout.addWidget(self.emotion_edit) model_layout.addLayout(emotion_layout) # 输出目录 output_layout = QHBoxLayout() output_layout.addWidget(QLabel("输出目录:")) self.output_edit = QLineEdit(self.output_dir) output_layout.addWidget(self.output_edit) self.output_browse_btn = QPushButton("浏览...") self.output_browse_btn.clicked.connect(self.browse_output) output_layout.addWidget(self.output_browse_btn) model_layout.addLayout(output_layout) main_layout.addWidget(model_group) # 控制按钮区域 control_layout = QHBoxLayout() self.start_btn = QPushButton("开始分析") self.start_btn.clicked.connect(self.start_analysis) control_layout.addWidget(self.start_btn) self.stop_btn = QPushButton("停止分析") self.stop_btn.clicked.connect(self.stop_analysis) self.stop_btn.setEnabled(False) control_layout.addWidget(self.stop_btn) self.clear_btn = QPushButton("清空") self.clear_btn.clicked.connect(self.clear_all) control_layout.addWidget(self.clear_btn) main_layout.addLayout(control_layout) # 进度条 self.progress_bar = QProgressBar() self.progress_bar.setValue(0) main_layout.addWidget(self.progress_bar) # 日志输出区域 log_group = QGroupBox("分析日志") log_layout = QVBoxLayout(log_group) self.log_text = QTextEdit() self.log_text.setReadOnly(True) log_layout.addWidget(self.log_text) main_layout.addWidget(log_group) # 状态区域 status_layout = QHBoxLayout() self.status_label = QLabel("就绪") status_layout.addWidget(self.status_label) self.file_count_label = QLabel("已选择0个音频文件") status_layout.addWidget(self.file_count_label) main_layout.addLayout(status_layout) # 初始化分析线程 self.analysis_thread = None def browse_audio(self): """浏览音频文件或文件夹""" options = QFileDialog.Options() files, _ = QFileDialog.getOpenFileNames( self, "选择音频文件", "", "音频文件 (*.mp3 *.wav *.amr *.ogg *.flac);;所有文件 (*)", options=options ) if files: self.audio_files = files self.audio_path_edit.setText("; ".join(files)) self.file_count_label.setText(f"已选择{len(files)}个音频文件") self.log_text.append(f"已选择{len(files)}个音频文件") def browse_keyword(self): """浏览关键词文件""" options = QFileDialog.Options() file, _ = QFileDialog.getOpenFileName( self, "选择关键词文件", "", "Excel文件 (*.xlsx *.xls);;所有文件 (*)", options=options ) if file: self.keyword_file = file self.keyword_path_edit.setText(file) self.log_text.append(f"已选择关键词文件: {file}") def browse_output(self): """浏览输出目录""" options = QFileDialog.Options() directory = QFileDialog.getExistingDirectory( self, "选择输出目录", "", options=options ) if directory: self.output_dir = directory self.output_edit.setText(directory) self.log_text.append(f"输出目录设置为: {directory}") def start_analysis(self): """开始分析""" if not self.audio_files: self.log_text.append("错误: 请先选择音频文件") return if not self.keyword_file: self.log_text.append("错误: 请先选择关键词文件") return # 更新模型路径 self.whisper_model_path = self.whisper_edit.text() self.pyannote_model_path = self.pyannote_edit.text() self.emotion_model_path = self.emotion_edit.text() self.output_dir = self.output_edit.text() # 创建输出目录 os.makedirs(self.output_dir, exist_ok=True) self.log_text.append("开始分析...") self.start_btn.setEnabled(False) self.stop_btn.setEnabled(True) self.status_label.setText("分析中...") self.progress_bar.setValue(0) # 创建并启动分析线程 self.analysis_thread = AnalysisThread( self.audio_files, self.keyword_file, self.whisper_model_path, self.pyannote_model_path, self.emotion_model_path ) self.analysis_thread.progress.connect(self.progress_bar.setValue) self.analysis_thread.message.connect(self.log_text.append) self.analysis_thread.analysis_complete.connect(self.on_analysis_complete) self.analysis_thread.error.connect(self.on_analysis_error) self.analysis_thread.finished.connect(self.on_analysis_finished) self.analysis_thread.start() def stop_analysis(self): """停止分析""" if self.analysis_thread and self.analysis_thread.isRunning(): self.analysis_thread.stop() self.log_text.append("正在停止分析...") self.stop_btn.setEnabled(False) def clear_all(self): """清空所有内容""" self.audio_files = [] self.keyword_file = "" self.audio_path_edit.clear() self.keyword_path_edit.clear() self.log_text.clear() self.progress_bar.setValue(0) self.status_label.setText("就绪") self.file_count_label.setText("已选择0个音频文件") self.log_text.append("已清空所有内容") def on_analysis_complete(self, result): """分析完成处理""" try: self.log_text.append("正在生成报告...") if not result.get("results"): self.log_text.append("警告: 没有生成任何分析结果") return # 生成Excel报告 excel_path = os.path.join(self.output_dir, "质检分析报告.xlsx") self.generate_excel_report(result, excel_path) # 生成Word报告 word_path = os.path.join(self.output_dir, "质检分析报告.docx") self.generate_word_report(result, word_path) self.log_text.append(f"分析报告已保存至: {excel_path}") self.log_text.append(f"可视化报告已保存至: {word_path}") self.log_text.append("分析完成!") self.status_label.setText(f"分析完成!报告保存至: {self.output_dir}") except Exception as e: import traceback self.log_text.append(f"生成报告时出错: {str(e)}\n{traceback.format_exc()}") def on_analysis_error(self, message): """分析错误处理""" self.log_text.append(f"错误: {message}") self.status_label.setText("发生错误") def on_analysis_finished(self): """分析线程结束处理""" self.start_btn.setEnabled(True) self.stop_btn.setEnabled(False) def generate_excel_report(self, result, output_path): """生成Excel报告""" # 从结果中提取数据 data = [] for res in result['results']: data.append({ "文件名": res['file_name'], "音频时长(秒)": res['duration'], "开场白检查": "通过" if res['opening_check'] else "未通过", "结束语检查": "通过" if res['closing_check'] else "未通过", "服务禁语检查": "通过" if not res['forbidden_check'] else "未通过", "客服情感": res['agent_emotion']['label'], "客服情感得分": res['agent_emotion']['score'], "客户情感": res['customer_emotion']['label'], "客户情感得分": res['customer_emotion']['score'], "语速(字/分)": res['speech_rate'], "平均音量(dB)": res['volume_mean'], "音量标准差": res['volume_std'], "问题解决率": "是" if res['resolution_rate'] else "否" }) # 创建DataFrame并保存 df = pd.DataFrame(data) df.to_excel(output_path, index=False) # 添加汇总统计 try: with pd.ExcelWriter(output_path, engine='openpyxl', mode='a', if_sheet_exists='replace') as writer: summary_data = { "统计项": ["总文件数", "开场白通过率", "结束语通过率", "服务禁语通过率", "问题解决率"], "数值": [ len(result['results']), df['开场白检查'].value_counts().get('通过', 0) / len(df), df['结束语检查'].value_counts().get('通过', 0) / len(df), df['服务禁语检查'].value_counts().get('通过', 0) / len(df), df['问题解决率'].value_counts().get('是', 0) / len(df) ] } summary_df = pd.DataFrame(summary_data) summary_df.to_excel(writer, sheet_name='汇总统计', index=False) except Exception as e: self.log_text.append(f"添加汇总统计时出错: {str(e)}") def generate_word_report(self, result, output_path): """生成Word报告""" doc = Document() # 添加标题 doc.add_heading('外呼电话录音质检分析报告', 0) # 添加基本信息 doc.add_heading('分析概况', level=1) doc.add_paragraph(f"分析时间: {time.strftime('%Y-%m-%d %H:%M:%S')}") doc.add_paragraph(f"分析文件数量: {len(result['results'])}") doc.add_paragraph(f"关键词文件: {os.path.basename(self.keyword_file)}") # 添加汇总统计 doc.add_heading('汇总统计', level=1) # 创建汇总表格 table = doc.add_table(rows=5, cols=2) table.style = 'Table Grid' # 表头 hdr_cells = table.rows[0].cells hdr_cells[0].text = '统计项' hdr_cells[1].text = '数值' # 计算统计数据 df = pd.DataFrame(result['results']) pass_rates = { "开场白通过率": df['opening_check'].mean() if not df.empty else 0, "结束语通过率": df['closing_check'].mean() if not df.empty else 0, "服务禁语通过率": (1 - df['forbidden_check']).mean() if not df.empty else 0, "问题解决率": df['resolution_rate'].mean() if not df.empty else 0 } # 填充表格 rows = [ ("总文件数", len(result['results'])), ("开场白通过率", f"{pass_rates['开场白通过率']:.2%}"), ("结束语通过率", f"{pass_rates['结束语通过率']:.2%}"), ("服务禁语通过率", f"{pass_rates['服务禁语通过率']:.2%}"), ("问题解决率", f"{pass_rates['问题解决率']:.2%}") ] for i, row_data in enumerate(rows): if i < len(table.rows): row_cells = table.rows[i].cells row_cells[0].text = row_data[0] row_cells[1].text = str(row_data[1]) # 添加情感分析图表 if result['results']: doc.add_heading('情感分析', level=1) # 客服情感分布 agent_emotions = [res['agent_emotion']['label'] for res in result['results']] agent_emotion_counts = pd.Series(agent_emotions).value_counts() if not agent_emotion_counts.empty: fig, ax = plt.subplots(figsize=(6, 4)) agent_emotion_counts.plot.pie(autopct='%1.1f%%', ax=ax) ax.set_title('客服情感分布') plt.tight_layout() # 保存图表到临时文件 chart_path = os.path.join(self.output_dir, "agent_emotion_chart.png") plt.savefig(chart_path, dpi=100) plt.close() doc.add_picture(chart_path, width=Inches(4)) doc.add_paragraph('图1: 客服情感分布') # 客户情感分布 customer_emotions = [res['customer_emotion']['label'] for res in result['results']] customer_emotion_counts = pd.Series(customer_emotions).value_counts() if not customer_emotion_counts.empty: fig, ax = plt.subplots(figsize=(6, 4)) customer_emotion_counts.plot.pie(autopct='%1.1f%%', ax=ax) ax.set_title('客户情感分布') plt.tight_layout() chart_path = os.path.join(self.output_dir, "customer_emotion_chart.png") plt.savefig(chart_path, dpi=100) plt.close() doc.add_picture(chart_path, width=Inches(4)) doc.add_paragraph('图2: 客户情感分布') # 添加详细分析结果 doc.add_heading('详细分析结果', level=1) # 创建详细表格 table = doc.add_table(rows=1, cols=6) table.style = 'Table Grid' # 表头 hdr_cells = table.rows[0].cells headers = ['文件名', '开场白', '结束语', '禁语', '客服情感', '问题解决'] for i, header in enumerate(headers): hdr_cells[i].text = header # 填充数据 for res in result['results']: row_cells = table.add_row().cells row_cells[0].text = res['file_name'] row_cells[1].text = "✓" if res['opening_check'] else "✗" row_cells[2].text = "✓" if res['closing_check'] else "✗" row_cells[3].text = "✗" if res['forbidden_check'] else "✓" row_cells[4].text = res['agent_emotion']['label'] row_cells[5].text = "✓" if res['resolution_rate'] else "✗" # 保存文档 doc.save(output_path) if __name__ == "__main__": # 检查是否安装了torch try: import torch except ImportError: print("警告: PyTorch 未安装,情感分析可能无法使用GPU加速") app = QApplication(sys.argv) window = MainWindow()

filetype

import pandas as pd import numpy as np import paddle import paddlenlp import os import time from paddlenlp.transformers import AutoModelForSequenceClassification, AutoTokenizer from paddlenlp.datasets import DatasetBuilder from sklearn.model_selection import train_test_split from sklearn.metrics import classification_report, accuracy_score, f1_score, recall_score, precision_score, roc_auc_score # 新增roc_auc_score # 参数配置 MODEL_NAME = 'ernie-3.0-medium-zh' BATCH_SIZE = 32 MAX_SEQ_LENGTH = 128 EPOCHS = 5 SEED = 42 CKPT_DIR = "ernie_ckpt" # 固定随机种子 np.random.seed(SEED) paddle.seed(SEED) # 1. 数据准备 df = pd.read_excel('模型语库_4.xlsx') print("类别分布:\n", df["是否进行数字化转型"].value_counts()) # 划分数据集 train_df, temp_df = train_test_split(df, test_size=0.2, stratify=df["是否进行数字化转型"], random_state=SEED) val_df, test_df = train_test_split(temp_df, test_size=0.5, stratify=temp_df["是否进行数字化转型"], random_state=SEED) print(f"训练集: {len(train_df)}, 验证集: {len(val_df)}, 测试集: {len(test_df)}") # 2. 创建Paddle数据集 tokenizer = AutoTokenizer.from_pretrained(MODEL_NAME) def convert_example(example): encoded_inputs = tokenizer( example["分割的句子"], max_seq_len=MAX_SEQ_LENGTH, padding='max_length', truncation=True ) return { "input_ids": encoded_inputs["input_ids"], "token_type_ids": encoded_inputs["token_type_ids"], "labels": example["是否进行数字化转型"] } class CustomDataset(DatasetBuilder): def __init__(self, data): self.data = data def __len__(self): return len(self.data) def __getitem__(self, idx): return convert_example(self.data.iloc[idx]) # 创建数据集和数据加载器 def collate_fn(batch): input_ids = paddle.to_tensor([item["input_ids"] for item in batch]) token_type_ids = paddle.to_tensor([item["token_type_ids"] for item in batch]) labels = paddle.to_tensor([item["labels"] for item in batch]) return input_ids, token_type_ids, labels train_dataset = CustomDataset(train_df) val_dataset = CustomDataset(val_df) test_dataset = CustomDataset(test_df) train_loader = paddle.io.DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True, collate_fn=collate_fn) val_loader = paddle.io.DataLoader(val_dataset, batch_size=BATCH_SIZE, collate_fn=collate_fn) test_loader = paddle.io.DataLoader(test_dataset, batch_size=BATCH_SIZE, collate_fn=collate_fn) # 3. 模型定义 model = AutoModelForSequenceClassification.from_pretrained(MODEL_NAME, num_classes=2) # 4. 训练配置 optimizer = paddle.optimizer.AdamW(learning_rate=1e-4, parameters=model.parameters()) criterion = paddle.nn.loss.CrossEntropyLoss() metric = paddle.metric.Accuracy() # 评估函数(修改:新增AUC计算) def evaluate(model, data_loader): model.eval() all_preds, all_probs, all_labels = [], [], [] # 新增all_probs with paddle.no_grad(): for batch in data_loader: input_ids, token_type_ids, labels = batch logits = model(input_ids, token_type_ids) # 获取预测概率(计算AUC所需) probs = paddle.nn.functional.softmax(logits, axis=1) preds = paddle.argmax(logits, axis=1) all_preds.extend(preds.numpy().tolist()) all_labels.extend(labels.numpy().tolist()) all_probs.extend(probs.numpy()[:, 1].tolist()) # 获取正类的概率 # 计算AUC auc = roc_auc_score(all_labels, all_probs) return ( accuracy_score(all_labels, all_preds), f1_score(all_labels, all_preds), precision_score(all_labels, all_preds), recall_score(all_labels, all_preds), auc # 新增AUC返回值 ) # 5. 训练循环(修改:使用AUC作为模型选择标准) global_step = 0 best_f1 = 0 # 将最佳指标改为AUC tic_train = time.time() for epoch in range(EPOCHS): model.train() for step, batch in enumerate(train_loader, start=1): input_ids, token_type_ids, labels = batch logits = model(input_ids, token_type_ids) loss = criterion(logits, labels) # 反向传播 loss.backward() optimizer.step() optimizer.clear_grad() # 更新指标 correct = metric.compute(logits, labels) metric.update(correct) global_step += 1 # 每10步打印训练信息 if global_step % 10 == 0: acc = metric.accumulate() metric.reset() speed = 10 / (time.time() - tic_train) tic_train = time.time() print(f"global step {global_step}, epoch: {epoch+1}, batch: {step}, " f"loss: {loss.numpy().item():.5f}, acc: {acc:.5f}, speed: {speed:.2f} step/s") # 每40步进行验证 if global_step % 40 == 0: val_acc, val_f1, val_precision, val_recall, val_auc = evaluate(model, val_loader) print(f"Evaluation @ step {global_step}: val_acc: {val_acc:.5f}, val_f1: {val_f1:.5f}, " f"val_precision: {val_precision:.5f}, val_recall: {val_recall:.5f}, val_auc: {val_auc:.5f}") # 保存最佳模型(改为使用AUC作为标准) if val_f1 > best_f1: best_f1 = val_f1 if not os.path.exists(CKPT_DIR): os.makedirs(CKPT_DIR) paddle.save(model.state_dict(), os.path.join(CKPT_DIR, "best_model.pdparams")) tokenizer.save_pretrained(CKPT_DIR) print(f"Saved best model @ step {global_step} with val_f1: {best_f1:.5f}") # 6. 最终测试评估(修改:新增AUC输出) print("\nTesting best model...") model.load_dict(paddle.load(os.path.join(CKPT_DIR, "best_model.pdparams"))) # 正确收集测试数据(修改:收集概率值用于AUC计算) test_preds, test_probs, test_labels = [], [], [] model.eval() with paddle.no_grad(): for batch in test_loader: input_ids, token_type_ids, labels = batch logits = model(input_ids, token_type_ids) probs = paddle.nn.functional.softmax(logits, axis=1) preds = paddle.argmax(logits, axis=1) test_preds.extend(preds.numpy().tolist()) test_labels.extend(labels.numpy().tolist()) test_probs.extend(probs.numpy()[:, 1].tolist()) # 获取正类的概率 # 计算测试集AUC test_auc = roc_auc_score(test_labels, test_probs) # 生成分类报告(新增AUC输出) print("测试集分类报告:") print(classification_report(test_labels, test_preds, target_names=['非转型', '转型'])) print(f"\n测试集 AUC: {test_auc:.5f}") # 输出AUC值 # 修改评估函数以返回完整的指标字典 def evaluate(model, data_loader, return_probs=False): model.eval() all_preds, all_probs, all_labels = [], [], [] with paddle.no_grad(): for batch in data_loader: input_ids, token_type_ids, labels = batch logits = model(input_ids, token_type_ids) # 获取预测概率 probs = paddle.nn.functional.softmax(logits, axis=1) preds = paddle.argmax(logits, axis=1) all_preds.extend(preds.numpy().tolist()) all_labels.extend(labels.numpy().tolist()) all_probs.extend(probs.numpy()[:, 1].tolist()) # 获取正类的概率 # 计算所有指标 metrics = { 'accuracy': accuracy_score(all_labels, all_preds), 'precision': precision_score(all_labels, all_preds), 'recall': recall_score(all_labels, all_preds), 'f1': f1_score(all_labels, all_preds), 'auc': roc_auc_score(all_labels, all_probs) } if return_probs: return metrics, all_probs else: return metrics # 7. 测试集评估 print("\n测试最佳模型...") model.load_dict(paddle.load(os.path.join(CKPT_DIR, "best_model.pdparams"))) test_metrics, test_probs = evaluate(model, test_loader, return_probs=True) print("\n测试集结果:") print(f"准确率 (Accuracy): {test_metrics['accuracy']:.4f}") print(f"精确率 (Precision): {test_metrics['precision']:.4f}") print(f"召回率 (Recall): {test_metrics['recall']:.4f}") print(f"F1分数: {test_metrics['f1']:.4f}") print(f"AUC值: {test_metrics['auc']:.4f}") # 如果需要,还可以打印分类报告 print("\n测试集分类报告:") print(classification_report(test_labels, test_preds, target_names=['非转型', '转型'])) paddlepaddle框架下修改使用ERNIE-BiLSTM模型

kudrei
  • 粉丝: 51
上传资源 快速赚钱