import os import re import time import torch import torch.nn as nn import pandas as pd import numpy as np from datetime import datetime from torch.utils.data import Dataset, DataLoader, random_split from tqdm import tqdm import pickle import mysql.connector from sklearn.feature_extraction.text import TfidfVectorizer from sklearn.metrics.pairwise import cosine_similarity from gensim.models import Word2Vec # SpaCy 用来取代 nltk 的低效文本处理 import spacy nlp = spacy.load("en_core_web_sm", disable=["parser", "ner"]) # 使用 gensim 的 Word2Vec 和 KeyedVectors from gensim.models import Word2Vec, TfidfModel from gensim.corpora import Dictionary # 自定义 Preprocess(快速版本) STOPWORDS = spacy.lang.en.stop_words.STOP_WORDS def clean_text(text): return re.sub(r'[^a-zA-Z0-9\s]', '', str(text)).strip().lower() def tokenize(text): doc = nlp(clean_text(text)) return [token.text for token in doc if token.text not in STOPWORDS and token.text.isalnum()] def preprocess(text): tokens = tokenize(text) return " ".join(tokens) class SemanticMatchModel(nn.Module): def __init__(self, input_dim): super().__init__() self.fc1 = nn.Linear(input_dim, 256) self.bn1 = nn.BatchNorm1d(256) self.fc2 = nn.Linear(256, 128) self.bn2 = nn.BatchNorm1d(128) self.fc3 = nn.Linear(128, 64) self.bn3 = nn.BatchNorm1d(64) self.fc4 = nn.Linear(64, 1) self.dropout = nn.Dropout(0.3) self.relu = nn.ReLU() self.sigmoid = nn.Sigmoid() def forward(self, x): x = self.relu(self.bn1(self.fc1(x))) x = self.dropout(x) x = self.relu(self.bn2(self.fc2(x))) x = self.dropout(x) x = self.relu(self.bn3(self.fc3(x))) x = self.dropout(x) x = self.sigmoid(self.fc4(x)) return x class QADataset(Dataset): """ 数据集:将正样本 (question, answer) 与随机负样本 (question, random_answer) 拼接在一起, 其中正样本 label=1,负样本 label=0。 """ def __init__(self, qa_pairs, tfidf_vectorizer, negative_ratio=1.0): """ :param qa_pairs: [(question_text, answer_text), ...] :param tfidf_vectorizer: 已经fit好的 TfidfVectorizer :param negative_ratio: 每个正样本对应的负样本倍数 """ self.qa_pairs = qa_pairs self.vectorizer = tfidf_vectorizer self.samples = [] # 构造正样本 for i, (q, a) in enumerate(self.qa_pairs): self.samples.append((q, a, 1)) # label=1 # 构建负样本:random替换answer if negative_ratio > 0: negative_samples = [] total_pairs = len(self.qa_pairs) for i, (q, a) in enumerate(self.qa_pairs): for _ in range(int(negative_ratio)): rand_idx = np.random.randint(total_pairs) # 若随机到同一个qa对,就重新随机 while rand_idx == i: rand_idx = np.random.randint(total_pairs) neg_q, neg_a = self.qa_pairs[rand_idx] # 保持question不变,随机替换答案 negative_samples.append((q, neg_a, 0)) self.samples.extend(negative_samples) def __len__(self): return len(self.samples) def __getitem__(self, idx): q, a, label = self.samples[idx] q_vec = self.vectorizer.transform([preprocess(q)]).toarray()[0] a_vec = self.vectorizer.transform([preprocess(a)]).toarray()[0] pair_vec = np.concatenate((q_vec, a_vec)) return torch.tensor(pair_vec, dtype=torch.float32), torch.tensor(label, dtype=torch.float32) class KnowledgeBase: def __init__(self, host='localhost', user='root', password='hy188747', database='ubuntu_qa', table='qa_pair', model_dir=r"D:\NLP-PT\PT4\model", negative_ratio=1.0): print("🔄 初始化知识库...") self.host = host self.user = user self.password = password self.database = database self.table = table self.model_dir = model_dir self.negative_ratio = negative_ratio # 确保模型目录存在 os.makedirs(self.model_dir, exist_ok=True) self.qa_pairs = [] self.q_texts = [] self.a_texts = [] self.semantic_model = None self.word2vec_model = None self.tfidf_vectorizer = None self.tfidf_matrix = None # 第一步:从数据库载入数据 self.load_data_from_mysql() # 第二步:加载或缓存预处理后的文本 self.load_or_cache_processed_questions() # 第三步:加载 TF-IDF + 向量化 self.load_cached_tfidf() # 第四步:加载 Word2Vec 或使用缓存 self.load_cached_word2vec_model() # 第五步:加载 PyTorch 模型 model_path = os.path.join(self.model_dir, 'semantic_match_model.pth') if os.path.exists(model_path): self.load_model() def load_data_from_mysql(self): print("🔄 正在连接 MySQL,加载问答数据...") conn = mysql.connector.connect( host=self.host, user=self.user, password=self.password, database=self.database ) cursor = conn.cursor() query = f"SELECT question_text, answer_text FROM {self.table}" cursor.execute(query) rows = cursor.fetchall() conn.close() self.qa_pairs = [(row[0], row[1]) for row in rows] self.q_texts = [pair[0] for pair in self.qa_pairs] self.a_texts = [pair[1] for pair in self.qa_pairs] print(f"✅ 成功从 MySQL 加载 {len(self.qa_pairs)} 条问答数据。") def load_or_cache_processed_questions(self): """使用本地缓存避免每次都预处理大量数据""" cache_path = os.path.join(self.model_dir, 'processed_questions.pkl') if os.path.exists(cache_path): print("🔄 使用缓存预处理后的分词文本。") with open(cache_path, 'rb') as f: self.processed_q_list = pickle.load(f) else: print("🔄 正在预处理问题文本(首次较慢)...") self.processed_q_list = [preprocess(q) for q in self.q_texts] with open(cache_path, 'wb') as f: pickle.dump(self.processed_q_list, f) print("✅ 预处理缓存已保存。") def load_cached_tfidf(self): """加载已存在的 TfidfVectorizer 或构建""" cache_tfidf_matrix = os.path.join(self.model_dir, 'tfidf_matrix.npz') cache_qa_list = os.path.join(self.model_dir, 'tfidf_qa.pkl') tfidf_path = os.path.join(self.model_dir, 'tfidf_vectorizer.pkl') if os.path.exists(tfidf_path) and os.path.exists(cache_tfidf_matrix) and os.path.exists(cache_qa_list): print("🔄 加载 TF-IDF 缓存版本。") import joblib self.tfidf_vectorizer = joblib.load(tfidf_path) self.tfidf_matrix = np.load(cache_tfidf_matrix)['tfidf'] with open(cache_qa_list, 'rb') as f: self.tfidf_qa = pickle.load(f) else: print("🔄 创建并构建 TF-IDF(首次较慢)...") self.tfidf_vectorizer = TfidfVectorizer( tokenizer=lambda x: x.split(), lowercase=False, max_features=10000 ) self.tfidf_qa = self.processed_q_list self.tfidf_matrix = self.tfidf_vectorizer.fit_transform(self.tfidf_qa).toarray() print("✅ TF-IDF 构建完成。") import joblib joblib.dump(self.tfidf_vectorizer, tfidf_path) np.savez_compressed(cache_tfidf_matrix, tfidf=self.tfidf_matrix) with open(cache_qa_list, 'wb') as f: pickle.dump(self.tfidf_qa, f) def load_cached_word2vec_model(self): """加载已训练好的 Word2Vec 模型,没有就训练""" word2vec_path = os.path.join(self.model_dir, 'word2vec.model') if os.path.exists(word2vec_path): print("🔄 加载缓存中的 Word2Vec 模型...") self.word2vec_model = Word2Vec.load(word2vec_path) else: print("🔄 训练 Word2Vec 模型(首次较慢)...") tokenized_questions = [preprocess(q).split() for q in self.q_texts] self.word2vec_model = Word2Vec( sentences=tokenized_questions, vector_size=100, window=5, min_count=1, workers=4 ) self.word2vec_model.save(word2vec_path) print("✅ Word2Vec 模型训练完成并保存。") def sentence_to_vec(self, sentence): """将句子转换为向量表示""" tokens = preprocess(sentence).split() if self.word2vec_model: vecs = [self.word2vec_model.wv[w] for w in tokens if w in self.word2vec_model.wv] return np.mean(vecs, axis=0) if vecs else np.zeros(self.word2vec_model.vector_size) else: # 没有 Word2Vec 模型时,使用 TF-IDF 向量 return self.tfidf_vectorizer.transform([preprocess(sentence)]).toarray()[0] def build_model(self, epochs=10, batch_size=128, lr=1e-3): """ 构建并训练语义匹配模型,包含训练集/验证集拆分与性能监控。 """ # 创建数据集 full_dataset = QADataset(self.qa_pairs, self.tfidf_vectorizer, negative_ratio=self.negative_ratio) # 划分训练集/验证集 train_size = int(len(full_dataset) * 0.8) val_size = len(full_dataset) - train_size train_dataset, val_dataset = random_split(full_dataset, [train_size, val_size]) # 创建数据加载器 train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True, num_workers=2) val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False, num_workers=2) # 初始化模型 sample_input, _ = full_dataset[0] input_dim = sample_input.shape[0] self.semantic_model = SemanticMatchModel(input_dim) criterion = nn.BCELoss() optimizer = optim.Adam(self.semantic_model.parameters(), lr=lr) scheduler = optim.lr_scheduler.StepLR(optimizer, step_size=1, gamma=0.9) # 训练模型 best_val_acc = 0.0 print("\n开始模型训练...") start_time = time.time() for epoch in range(epochs): self.semantic_model.train() total_loss, total_correct, total_samples = 0.0, 0, 0 for X_batch, y_batch in tqdm(train_loader, desc=f"Epoch {epoch + 1}/{epochs} - 训练中"): optimizer.zero_grad() outputs = self.semantic_model(X_batch).squeeze() loss = criterion(outputs, y_batch) loss.backward() optimizer.step() total_loss += loss.item() * len(y_batch) preds = (outputs >= 0.5).float() total_correct += (preds == y_batch).sum().item() total_samples += len(y_batch) train_loss = total_loss / total_samples train_acc = total_correct / total_samples # 验证阶段 self.semantic_model.eval() val_loss, val_correct, val_samples = 0.0, 0, 0 with torch.no_grad(): for X_val, y_val in val_loader: outputs_val = self.semantic_model(X_val).squeeze() loss_val = criterion(outputs_val, y_val) val_loss += loss_val.item() * len(y_val) preds_val = (outputs_val >= 0.5).float() val_correct += (preds_val == y_val).sum().item() val_samples += len(y_val) val_loss /= val_samples val_acc = val_correct / val_samples # 更新学习率 scheduler.step() print(f"Epoch [{epoch + 1}/{epochs}] | " f"Train Loss: {train_loss:.4f}, Train Acc: {train_acc:.4f} | " f"Val Loss: {val_loss:.4f}, Val Acc: {val_acc:.4f}") # 保存最优模型 if val_acc > best_val_acc: best_val_acc = val_acc model_path = os.path.join(self.model_dir, 'semantic_match_model.pth') torch.save(self.semantic_model.state_dict(), model_path) print(f"✅ 新的最优模型已保存 (Val Acc: {best_val_acc:.4f})") end_time = time.time() print(f"\n训练完成,共耗时 {end_time - start_time:.2f} 秒。") # 加载最优模型权重 model_path = os.path.join(self.model_dir, 'semantic_match_model.pth') self.semantic_model.load_state_dict(torch.load(model_path)) self.semantic_model.eval() def load_model(self): """加载训练好的语义匹配 PyTorch 模型""" input_dim = self.tfidf_matrix.shape[1] * 2 model_path = os.path.join(self.model_dir, 'semantic_match_model.pth') self.semantic_model = SemanticMatchModel(input_dim) self.semantic_model.load_state_dict(torch.load(model_path, map_location='cpu')) self.semantic_model.eval() print("✅ 语义匹配模型加载完成。") def retrieve(self, query, semantic_topk=100): """ 检索接口:先通过 TF-IDF + 句向量评分做粗检,再对Top-K结果用语义模型做精检,返回最匹配的 QA。 """ # 粗检 query_tfidf = self.tfidf_vectorizer.transform([preprocess(query)]).toarray()[0] tfidf_scores = cosine_similarity([query_tfidf], self.tfidf_matrix).flatten() query_sent_vec = self.sentence_to_vec(query) sent_vecs = np.array([self.sentence_to_vec(q) for q in self.q_texts]) sent_scores = cosine_similarity([query_sent_vec], sent_vecs).flatten() sim_scores = tfidf_scores + sent_scores topk_indices = np.argpartition(sim_scores, -semantic_topk)[-semantic_topk:] topk_indices = topk_indices[np.argsort(sim_scores[topk_indices])[::-1]] # 精检 if self.semantic_model: with torch.no_grad(): batch_inputs = [] for i in topk_indices: q = preprocess(self.q_texts[i]) a = preprocess(self.a_texts[i]) q_vec = self.tfidf_vectorizer.transform([q]).toarray()[0] a_vec = self.tfidf_vectorizer.transform([a]).toarray()[0] pair_input = np.concatenate((q_vec, a_vec)) batch_inputs.append(pair_input) batch_inputs = torch.tensor(np.stack(batch_inputs), dtype=torch.float32) batch_scores = self.semantic_model(batch_inputs).squeeze().cpu().numpy() semantic_scores = batch_scores # 综合得分 final_scores = sim_scores[topk_indices] + semantic_scores best_idx = topk_indices[np.argmax(final_scores)] return self.qa_pairs[best_idx], final_scores.max() else: # 没有语义模型时,只使用粗检结果 best_idx = topk_indices[0] return self.qa_pairs[best_idx], sim_scores[best_idx] def recommend_similar(self, query, topk=3): """针对未命中答案的情况,推荐相似问题""" query_tfidf = self.tfidf_vectorizer.transform([preprocess(query)]).toarray()[0] scores = cosine_similarity([query_tfidf], self.tfidf_matrix).flatten() topk_idx = scores.argsort()[0][-topk:][::-1] return [(self.qa_pairs[i][0], self.qa_pairs[i][1]) for i in topk_idx] class FeedbackRecorder: """记录未回答问题""" def __init__(self, file_path='unanswered_questions.csv'): self.file_path = file_path if not os.path.exists(self.file_path): with open(self.file_path, 'w', newline='', encoding='utf-8') as f: import csv csv.writer(f).writerow(['time', 'question']) def record_question(self, question): with open(self.file_path, 'a', newline='', encoding='utf-8') as f: import csv writer = csv.writer(f) writer.writerow([datetime.now().isoformat(), question]) def main(): kb = KnowledgeBase( host='localhost', user='root', password='hy188747', database='ubuntu_qa', table='qa_pair', model_dir=r"D:\NLP-PT\PT4\model", negative_ratio=1.0 ) # 是否重新训练语义匹配模型 if input("是否重新训练语义匹配模型?(y/n): ").strip().lower() == 'y': kb.build_model( epochs=5, # 训练轮数 batch_size=128, # 批大小 lr=1e-3 # 学习率 ) recorder = FeedbackRecorder() print("\n🎯 智能知识问答系统已启动(输入'q'退出聊天)\n") while True: query = input("🧐 问题:") if query.strip().lower() == 'q': break result, score = kb.retrieve(query) if result: print("💡 回答:", result[1]) print(f"📊 匹配信心分数: {score:.4f}\n") else: print("⚠ 没有找到合适的答案,已将你的问题记录下来。") recorder.record_question(query) print("🔥 相似问题推荐:") for q, a in kb.recommend_similar(query): print(f"Q: {q}\nA: {a}\n") if __name__ == "__main__": main()
时间: 2025-07-25 21:32:50 AIGC 浏览: 90
<think>我们正在构建一个智能问答系统,其中包含以下主要组件:
1. 知识库(KnowledgeBase):负责从MySQL数据库加载问答对,预处理文本,构建TF-IDF和Word2Vec模型,以及训练一个语义匹配模型(基于PyTorch的神经网络)来评估问题-答案对的匹配程度。
2. 数据集(QADataset):用于构建训练语义匹配模型的数据集,包含正样本(问题与正确答案)和负样本(问题与随机答案)。
3. 语义匹配模型(SemanticMatchModel):一个简单的全连接神经网络,用于判断问题与答案的匹配程度。
4. 反馈记录器(FeedbackRecorder):记录未回答的问题。
5. 主函数(main):初始化知识库,训练模型(如果需要),并启动一个交互式问答系统。
代码中已经实现了大部分功能,但存在一些需要修复和优化的地方。以下是一些问题及解决方案:
问题1:在`KnowledgeBase`的`__init__`方法中,当加载模型时,需要知道输入维度(input_dim),但此时`tfidf_matrix`可能尚未初始化(在`load_cached_tfidf`中初始化)。因此,在加载模型之前,必须先确保`tfidf_vectorizer`和`tfidf_matrix`已经加载。
问题2:在`load_cached_tfidf`方法中,我们使用了`joblib`来保存和加载`tfidf_vectorizer`,但是代码中没有导入`joblib`。此外,保存TF-IDF矩阵时使用了`np.savez_compressed`,但加载时使用了`np.load`,这是正确的。
问题3:在`build_model`方法中,使用了`optim`,但代码中没有导入`torch.optim`。
问题4:在`retrieve`方法中,当没有语义模型时,直接返回粗检结果,但粗检结果中我们使用了两个分数(TF-IDF和句向量)的和,然后取topk。然而,在没有语义模型的情况下,我们只返回了粗检中分数最高的一个,但这里应该注意,粗检分数是TF-IDF和句向量的和,所以没有问题。
问题5:在`recommend_similar`方法中,我们使用了`tfidf_matrix`来计算余弦相似度,但这里返回的是问题和答案对,而实际上我们只需要问题文本的相似度即可。但是,我们存储的是每个问题的TF-IDF向量,所以这里计算的是问题之间的相似度,符合预期。
问题6:在`QADataset`中,我们使用了`tfidf_vectorizer`来转换问题和答案,但是注意,答案文本也应该被预处理和向量化。这里我们使用了同样的向量化器,这是合理的,因为问题和答案都是文本。
问题7:在`load_cached_tfidf`方法中,我们保存了`tfidf_qa`(即预处理后的问题文本列表),但实际上在后续使用中,我们并没有使用这个列表,而是使用了`processed_q_list`(在`load_or_cache_processed_questions`中生成)。这里可以统一。
问题8:在`load_cached_word2vec_model`中,我们训练Word2Vec模型时使用了`preprocess(q).split()`,这已经将问题分词,符合要求。
问题9:在`retrieve`方法中,精检部分我们使用了问题文本和答案文本的TF-IDF向量拼接作为输入。注意,在数据集中,我们也是这样构建的,所以一致。
问题10:在`main`函数中,我们初始化`KnowledgeBase`后,询问是否重新训练模型。如果选择重新训练,则调用`build_model`,然后进行问答。但是,如果选择不训练,那么`kb`中的语义模型是通过`load_model`加载的(在初始化时已经调用)。但是,在初始化时,如果模型文件不存在,则不会加载。因此,我们需要确保在训练后保存模型,并在需要时加载。
此外,还有一些代码优化和错误修复:
- 在`load_cached_tfidf`中,如果缓存不存在,我们构建TF-IDF矩阵后,需要保存向量化器和矩阵。这里我们使用了`joblib`,所以需要导入`joblib`。
- 在`build_model`中,我们使用了`optim`,所以需要导入:`import torch.optim as optim`。
- 在`KnowledgeBase`的`__init__`中,加载模型的代码(`if os.path.exists(model_path): self.load_model()`)在`load_cached_tfidf`之前,此时`tfidf_matrix`尚未初始化,导致`input_dim`未知。因此,我们需要调整顺序:先加载TF-IDF,再尝试加载模型。
调整顺序:
1. 加载数据(`load_data_from_mysql`)
2. 加载或缓存预处理文本(`load_or_cache_processed_questions`)
3. 加载TF-IDF(`load_cached_tfidf`)
4. 加载Word2Vec(`load_cached_word2vec_model`)
5. 加载模型(此时`input_dim`可以计算:`self.tfidf_matrix.shape[1] * 2`)
因此,我们将`__init__`中加载模型的步骤移到`load_cached_tfidf`之后。
修改后的`__init__`中加载模型的部分:
```python
# 第五步:加载 PyTorch 模型(必须在TF-IDF之后,因为需要input_dim)
model_path = os.path.join(self.model_dir, 'semantic_match_model.pth')
if os.path.exists(model_path):
self.load_model() # 在load_model中需要input_dim,所以必须确保tfidf_matrix已经存在
else:
print("⚠ 语义匹配模型未训练,请先训练模型。")
```
另外,在`load_model`方法中,我们使用`self.tfidf_matrix.shape[1] * 2`作为输入维度,这要求`self.tfidf_matrix`已经被初始化。因此,我们在`load_cached_tfidf`之后调用`load_model`。
我们还需要在`load_model`方法中确保`self.tfidf_matrix`已经存在。因为`load_model`是在`__init__`中调用的,而我们在`__init__`中先调用了`load_cached_tfidf`,所以`self.tfidf_matrix`应该存在。
现在,我们进行代码修改:
步骤:
1. 在文件开头导入`torch.optim`和`joblib`(用于保存和加载TF-IDF向量化器)。
2. 调整`KnowledgeBase.__init__`中加载模型的顺序(在TF-IDF之后)。
3. 修复`load_cached_tfidf`中的`joblib`导入问题(如果没有导入,则导入)。
4. 在`build_model`中,确保使用`torch.optim`。
注意:由于代码较长,我们只修改关键部分。
修改后的代码:</think>我将分析您提供的代码并解决存在的问题。代码实现了一个智能问答系统,但存在一些需要修复的问题。以下是完整解决方案:
### 主要问题及修复方案:
1. **缺少导入优化器**:`build_model`方法中使用了`optim`但未导入
2. **负样本构建逻辑问题**:`QADataset`中负样本构建可能导致无效样本
3. **Word2Vec向量化问题**:`sentence_to_vec`方法未处理空向量情况
4. **推荐相似问题实现问题**:`recommend_similar`方法索引使用错误
5. **模型初始化顺序问题**:`KnowledgeBase`中模型加载顺序需要调整
### 修复后的完整代码:
```python
import os
import re
import time
import torch
import torch.nn as nn
import torch.optim as optim # 添加缺失的优化器导入
import pandas as pd
import numpy as np
from datetime import datetime
from torch.utils.data import Dataset, DataLoader, random_split
from tqdm import tqdm
import pickle
import mysql.connector
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.metrics.pairwise import cosine_similarity
from gensim.models import Word2Vec
import spacy
nlp = spacy.load("en_core_web_sm", disable=["parser", "ner"])
STOPWORDS = spacy.lang.en.stop_words.STOP_WORDS
def clean_text(text):
return re.sub(r'[^a-zA-Z0-9\s]', '', str(text)).strip().lower()
def tokenize(text):
doc = nlp(clean_text(text))
return [token.text for token in doc if token.text not in STOPWORDS and token.text.isalnum()]
def preprocess(text):
tokens = tokenize(text)
return " ".join(tokens)
class SemanticMatchModel(nn.Module):
def __init__(self, input_dim):
super().__init__()
self.fc1 = nn.Linear(input_dim, 256)
self.bn1 = nn.BatchNorm1d(256)
self.fc2 = nn.Linear(256, 128)
self.bn2 = nn.BatchNorm1d(128)
self.fc3 = nn.Linear(128, 64)
self.bn3 = nn.BatchNorm1d(64)
self.fc4 = nn.Linear(64, 1)
self.dropout = nn.Dropout(0.3)
self.relu = nn.ReLU()
self.sigmoid = nn.Sigmoid()
def forward(self, x):
x = self.relu(self.bn1(self.fc1(x)))
x = self.dropout(x)
x = self.relu(self.bn2(self.fc2(x)))
x = self.dropout(x)
x = self.relu(self.bn3(self.fc3(x)))
x = self.dropout(x)
x = self.sigmoid(self.fc4(x))
return x
class QADataset(Dataset):
def __init__(self, qa_pairs, tfidf_vectorizer, negative_ratio=1.0):
self.qa_pairs = qa_pairs
self.vectorizer = tfidf_vectorizer
self.samples = []
# 构建正样本
for i, (q, a) in enumerate(self.qa_pairs):
self.samples.append((q, a, 1))
# 修复负样本构建逻辑
if negative_ratio > 0:
total_pairs = len(self.qa_pairs)
# 预先收集所有答案用于负采样
all_answers = [a for _, a in self.qa_pairs]
for i, (q, a) in enumerate(self.qa_pairs):
for _ in range(int(negative_ratio)):
# 随机选择不同于当前答案的负样本
neg_a = np.random.choice(all_answers)
while neg_a == a: # 确保不是当前答案
neg_a = np.random.choice(all_answers)
self.samples.append((q, neg_a, 0))
def __len__(self):
return len(self.samples)
def __getitem__(self, idx):
q, a, label = self.samples[idx]
q_vec = self.vectorizer.transform([preprocess(q)]).toarray()[0]
a_vec = self.vectorizer.transform([preprocess(a)]).toarray()[0]
pair_vec = np.concatenate((q_vec, a_vec))
return torch.tensor(pair_vec, dtype=torch.float32), torch.tensor(label, dtype=torch.float32)
class KnowledgeBase:
def __init__(self,
host='localhost',
user='root',
password='hy188747',
database='ubuntu_qa',
table='qa_pair',
model_dir=r"D:\NLP-PT\PT4\model",
negative_ratio=1.0):
print("🔄 初始化知识库...")
self.host = host
self.user = user
self.password = password
self.database = database
self.table = table
self.model_dir = model_dir
self.negative_ratio = negative_ratio
os.makedirs(self.model_dir, exist_ok=True)
self.qa_pairs = []
self.q_texts = []
self.a_texts = []
self.semantic_model = None
self.word2vec_model = None
self.tfidf_vectorizer = None
self.tfidf_matrix = None
# 调整初始化顺序
self.load_data_from_mysql()
self.load_or_cache_processed_questions()
self.load_cached_tfidf()
self.load_cached_word2vec_model()
# 最后加载模型(确保依赖项已初始化)
model_path = os.path.join(self.model_dir, 'semantic_match_model.pth')
if os.path.exists(model_path):
self.load_model()
else:
print("⚠ 语义匹配模型未训练,请先训练模型。")
def load_data_from_mysql(self):
print("🔄 正在连接 MySQL,加载问答数据...")
try:
conn = mysql.connector.connect(
host=self.host,
user=self.user,
password=self.password,
database=self.database
)
cursor = conn.cursor()
query = f"SELECT question_text, answer_text FROM {self.table}"
cursor.execute(query)
rows = cursor.fetchall()
self.qa_pairs = [(row[0], row[1]) for row in rows]
self.q_texts = [pair[0] for pair in self.qa_pairs]
self.a_texts = [pair[1] for pair in self.qa_pairs]
print(f"✅ 成功从 MySQL 加载 {len(self.qa_pairs)} 条问答数据。")
except Exception as e:
print(f"❌ 数据库连接失败: {e}")
self.qa_pairs = []
finally:
if conn.is_connected():
conn.close()
def load_or_cache_processed_questions(self):
cache_path = os.path.join(self.model_dir, 'processed_questions.pkl')
if os.path.exists(cache_path):
print("🔄 使用缓存预处理后的分词文本。")
with open(cache_path, 'rb') as f:
self.processed_q_list = pickle.load(f)
else:
print("🔄 正在预处理问题文本(首次较慢)...")
self.processed_q_list = [preprocess(q) for q in tqdm(self.q_texts)]
with open(cache_path, 'wb') as f:
pickle.dump(self.processed_q_list, f)
print("✅ 预处理缓存已保存。")
def load_cached_tfidf(self):
cache_tfidf_matrix = os.path.join(self.model_dir, 'tfidf_matrix.npz')
cache_qa_list = os.path.join(self.model_dir, 'tfidf_qa.pkl')
tfidf_path = os.path.join(self.model_dir, 'tfidf_vectorizer.pkl')
if os.path.exists(tfidf_path) and os.path.exists(cache_tfidf_matrix) and os.path.exists(cache_qa_list):
print("🔄 加载 TF-IDF 缓存版本。")
import joblib
self.tfidf_vectorizer = joblib.load(tfidf_path)
self.tfidf_matrix = np.load(cache_tfidf_matrix)['tfidf']
with open(cache_qa_list, 'rb') as f:
self.tfidf_qa = pickle.load(f)
else:
print("🔄 创建并构建 TF-IDF(首次较慢)...")
self.tfidf_vectorizer = TfidfVectorizer(
tokenizer=lambda x: x.split(),
lowercase=False,
max_features=10000
)
self.tfidf_qa = self.processed_q_list
self.tfidf_matrix = self.tfidf_vectorizer.fit_transform(self.tfidf_qa).toarray()
print("✅ TF-IDF 构建完成。")
import joblib
joblib.dump(self.tfidf_vectorizer, tfidf_path)
np.savez_compressed(cache_tfidf_matrix, tfidf=self.tfidf_matrix)
with open(cache_qa_list, 'wb') as f:
pickle.dump(self.tfidf_qa, f)
def load_cached_word2vec_model(self):
word2vec_path = os.path.join(self.model_dir, 'word2vec.model')
if os.path.exists(word2vec_path):
print("🔄 加载缓存中的 Word2Vec 模型...")
self.word2vec_model = Word2Vec.load(word2vec_path)
else:
print("🔄 训练 Word2Vec 模型(首次较慢)...")
tokenized_questions = [preprocess(q).split() for q in self.q_texts]
self.word2vec_model = Word2Vec(
sentences=tokenized_questions,
vector_size=100,
window=5,
min_count=1,
workers=4,
epochs=10
)
self.word2vec_model.save(word2vec_path)
print("✅ Word2Vec 模型训练完成并保存。")
def sentence_to_vec(self, sentence):
"""修复空向量问题"""
tokens = preprocess(sentence).split()
if not tokens:
return np.zeros(100) # 默认向量大小
if self.word2vec_model:
vecs = [self.word2vec_model.wv[w] for w in tokens if w in self.word2vec_model.wv]
return np.mean(vecs, axis=0) if vecs else np.zeros(self.word2vec_model.vector_size)
else:
vec = self.tfidf_vectorizer.transform([preprocess(sentence)]).toarray()[0]
return vec
def build_model(self, epochs=10, batch_size=128, lr=1e-3):
# 创建数据集
full_dataset = QADataset(self.qa_pairs, self.tfidf_vectorizer, negative_ratio=self.negative_ratio)
# 划分训练集/验证集
train_size = int(len(full_dataset) * 0.8)
val_size = len(full_dataset) - train_size
train_dataset, val_dataset = random_split(full_dataset, [train_size, val_size])
# 创建数据加载器
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True, num_workers=2)
val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False, num_workers=2)
# 初始化模型
sample_input, _ = full_dataset[0]
input_dim = sample_input.shape[0]
self.semantic_model = SemanticMatchModel(input_dim)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
self.semantic_model.to(device)
criterion = nn.BCELoss()
optimizer = optim.Adam(self.semantic_model.parameters(), lr=lr)
scheduler = optim.lr_scheduler.ReduceLROnPlateau(optimizer, 'max', patience=2, factor=0.5)
# 训练模型
best_val_acc = 0.0
print("\n开始模型训练...")
start_time = time.time()
for epoch in range(epochs):
self.semantic_model.train()
total_loss, total_correct, total_samples = 0.0, 0, 0
for X_batch, y_batch in tqdm(train_loader, desc=f"Epoch {epoch + 1}/{epochs} - 训练中"):
X_batch, y_batch = X_batch.to(device), y_batch.to(device)
optimizer.zero_grad()
outputs = self.semantic_model(X_batch).squeeze()
loss = criterion(outputs, y_batch)
loss.backward()
optimizer.step()
total_loss += loss.item() * len(y_batch)
preds = (outputs >= 0.5).float()
total_correct += (preds == y_batch).sum().item()
total_samples += len(y_batch)
train_loss = total_loss / total_samples
train_acc = total_correct / total_samples
# 验证阶段
self.semantic_model.eval()
val_loss, val_correct, val_samples = 0.0, 0, 0
with torch.no_grad():
for X_val, y_val in val_loader:
X_val, y_val = X_val.to(device), y_val.to(device)
outputs_val = self.semantic_model(X_val).squeeze()
loss_val = criterion(outputs_val, y_val)
val_loss += loss_val.item() * len(y_val)
preds_val = (outputs_val >= 0.5).float()
val_correct += (preds_val == y_val).sum().item()
val_samples += len(y_val)
val_loss /= val_samples
val_acc = val_correct / val_samples
# 更新学习率
scheduler.step(val_acc)
print(f"Epoch [{epoch + 1}/{epochs}] | "
f"Train Loss: {train_loss:.4f}, Train Acc: {train_acc:.4f} | "
f"Val Loss: {val_loss:.4f}, Val Acc: {val_acc:.4f}")
# 保存最优模型
if val_acc > best_val_acc:
best_val_acc = val_acc
model_path = os.path.join(self.model_dir, 'semantic_match_model.pth')
torch.save(self.semantic_model.state_dict(), model_path)
print(f"✅ 新的最优模型已保存 (Val Acc: {best_val_acc:.4f})")
end_time = time.time()
print(f"\n训练完成,共耗时 {end_time - start_time:.2f} 秒。")
# 加载最优模型权重
model_path = os.path.join(self.model_dir, 'semantic_match_model.pth')
self.semantic_model.load_state_dict(torch.load(model_path, map_location=device))
self.semantic_model.eval()
def load_model(self):
"""加载训练好的语义匹配模型"""
input_dim = self.tfidf_matrix.shape[1] * 2
model_path = os.path.join(self.model_dir, 'semantic_match_model.pth')
self.semantic_model = SemanticMatchModel(input_dim)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
self.semantic_model.load_state_dict(torch.load(model_path, map_location=device))
self.semantic_model.to(device)
self.semantic_model.eval()
print("✅ 语义匹配模型加载完成。")
def retrieve(self, query, semantic_topk=100):
# 粗检
query_tfidf = self.tfidf_vectorizer.transform([preprocess(query)]).toarray()[0]
tfidf_scores = cosine_similarity([query_tfidf], self.tfidf_matrix).flatten()
query_sent_vec = self.sentence_to_vec(query)
sent_vecs = np.array([self.sentence_to_vec(q) for q in self.q_texts])
sent_scores = cosine_similarity([query_sent_vec], sent_vecs).flatten()
sim_scores = tfidf_scores + sent_scores
topk_indices = np.argpartition(sim_scores, -semantic_topk)[-semantic_topk:]
topk_indices = topk_indices[np.argsort(sim_scores[topk_indices])[::-1]]
# 精检
if self.semantic_model:
device = next(self.semantic_model.parameters()).device
with torch.no_grad():
batch_inputs = []
for i in topk_indices:
q = preprocess(self.q_texts[i])
a = preprocess(self.a_texts[i])
q_vec = self.tfidf_vectorizer.transform([q]).toarray()[0]
a_vec = self.tfidf_vectorizer.transform([a]).toarray()[0]
pair_input = np.concatenate((q_vec, a_vec))
batch_inputs.append(pair_input)
batch_inputs = torch.tensor(np.stack(batch_inputs), dtype=torch.float32).to(device)
batch_scores = self.semantic_model(batch_inputs).squeeze().cpu().numpy()
semantic_scores = batch_scores
# 综合得分
final_scores = sim_scores[topk_indices] + semantic_scores
best_idx = topk_indices[np.argmax(final_scores)]
return self.qa_pairs[best_idx], final_scores.max()
else:
best_idx = topk_indices[0]
return self.qa_pairs[best_idx], sim_scores[best_idx]
def recommend_similar(self, query, topk=3):
"""修复索引使用问题"""
query_tfidf = self.tfidf_vectorizer.transform([preprocess(query)]).toarray()[0]
scores = cosine_similarity([query_tfidf], self.tfidf_matrix).flatten()
topk_idx = scores.argsort()[-topk:][::-1] # 修复索引获取方式
return [(self.q_texts[i], self.a_texts[i]) for i in topk_idx]
class FeedbackRecorder:
def __init__(self, file_path='unanswered_questions.csv'):
self.file_path = file_path
if not os.path.exists(self.file_path):
with open(self.file_path, 'w', newline='', encoding='utf-8') as f:
import csv
csv.writer(f).writerow(['time', 'question'])
def record_question(self, question):
with open(self.file_path, 'a', newline='', encoding='utf-8') as f:
import csv
writer = csv.writer(f)
writer.writerow([datetime.now().isoformat(), question])
def main():
kb = KnowledgeBase(
host='localhost',
user='root',
password='hy188747',
database='ubuntu_qa',
table='qa_pair',
model_dir=r"D:\NLP-PT\PT4\model",
negative_ratio=1.0
)
if input("是否重新训练语义匹配模型?(y/n): ").strip().lower() == 'y':
kb.build_model(
epochs=5,
batch_size=128,
lr=1e-3
)
recorder = FeedbackRecorder()
print("\n🎯 智能知识问答系统已启动(输入'q'退出聊天)\n")
while True:
query = input("🧐 问题:")
if query.strip().lower() == 'q':
break
try:
result, score = kb.retrieve(query)
if result:
print(f"💡 回答:{result[1]}")
print(f"📊 匹配信心分数: {score:.4f}\n")
else:
print("⚠ 没有找到合适的答案,已将你的问题记录下来。")
recorder.record_question(query)
print("🔥 相似问题推荐:")
for q, a in kb.recommend_similar(query):
print(f"Q: {q}\nA: {a}\n")
except Exception as e:
print(f"❌ 检索过程中发生错误: {e}")
if __name__ == "__main__":
main()
```
### 关键修复点解释:
1. **优化器导入**:
```python
import torch.optim as optim # 添加缺失的优化器导入
```
2. **负样本构建逻辑**:
```python
# 修复负样本构建逻辑
if negative_ratio > 0:
total_pairs = len(self.qa_pairs)
# 预先收集所有答案用于负采样
all_answers = [a for _, a in self.qa_pairs]
for i, (q, a) in enumerate(self.qa_pairs):
for _ in range(int(negative_ratio)):
# 随机选择不同于当前答案的负样本
neg_a = np.random.choice(all_answers)
while neg_a == a: # 确保不是当前答案
neg_a = np.random.choice(all_answers)
self.samples.append((q, neg_a, 0))
```
3. **空向量处理**:
```python
def sentence_to_vec(self, sentence):
"""修复空向量问题"""
tokens = preprocess(sentence).split()
if not tokens:
return np.zeros(100) # 默认向量大小
...
```
4. **推荐相似问题索引**:
```python
def recommend_similar(self, query, topk=3):
query_tfidf = self.tfidf_vectorizer.transform([preprocess(query)]).toarray()[0]
scores = cosine_similarity([query_tfidf], self.tfidf_matrix).flatten()
topk_idx = scores.argsort()[-topk:][::-1] # 修复索引获取方式
...
```
5. **模型初始化顺序**:
```python
# 调整初始化顺序
self.load_data_from_mysql()
self.load_or_cache_processed_questions()
self.load_cached_tfidf()
self.load_cached_word2vec_model()
```
6. **GPU支持**:
```python
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
self.semantic_model.to(device)
```
### 系统优化点:
1. **错误处理**:添加数据库连接异常处理
2. **进度显示**:在预处理阶段添加tqdm进度条
3. **学习率调度**:使用ReduceLROnPlateau替代StepLR
4. **GPU支持**:自动检测并使用可用GPU
5. **空输入处理**:确保空查询不会导致崩溃
### 相关问题建议:
阅读全文
相关推荐


















