活动介绍
file-type

HA-model: Julia实现异质性经济代理模型

下载需积分: 46 | 109KB | 更新于2025-02-09 | 187 浏览量 | 5 下载量 举报 收藏
download 立即下载
标题“HA-model”所指代的很可能是“Heterogeneous Agents Model”(异质代理人模型),这在宏观经济学和金融学中是一个重要的研究领域,特别是在分析经济动态和个体行为决策时。异质代理人模型考虑了经济系统中个体的异质性,这些个体可能在偏好、资产、收入等方面存在差异。在宏观经济学中,这类模型用于研究不同类型的经济主体(如家庭、企业)如何在不确定性和信息不对称的条件下做出最优决策。 描述部分仅仅给出了“HA-model”,这可能是出于信息保护的目的,没有提供额外的信息。因此,我们将根据标题和标签给出的有限信息进行知识点的阐述。 标签中提到的“julia”指的是一种高性能的现代编程语言,特别适合于数值计算、数据分析、机器学习等领域。Julia语言在经济学领域特别是宏微观模型的模拟和分析中被广泛应用,其优异的性能和简洁的语法使其成为经济学模型模拟的优选语言之一。 “macroeconomics”即宏观经济学,是经济学的一个分支,主要研究整体经济活动,包括GDP、失业率、通货膨胀等宏观经济指标。在宏观经济学中,使用异质代理人模型可以更准确地模拟和解释这些宏观经济变量的变动情况。 “hank”可能是对“Heterogeneous Agent New Keynesian”(异质代理人新凯恩斯模型)的简称。这种模型结合了异质代理人模型和新凯恩斯宏观经济学,考虑了个体行为的异质性对经济周期的影响。在这类模型中,消费者、企业等经济主体会根据自己的偏好、约束和预期做出不同的经济决策,这些决策反过来又会影响整个经济的运行。 “heterogenous-agents-model”指的即是异质代理人模型。此模型是宏观经济学研究中用来模拟真实经济行为的重要工具,它打破了传统宏观模型中将经济体视作单一代表性个体的假设,而是将经济体视为由不同的、具有不同特征和行为的个体组成的集合。通过这种模型,经济学家可以更好地理解经济危机、贫富差距扩大、财富分配不均等现象。 “JupyterNotebook”是一个开源的Web应用程序,允许创建和共享包含实时代码、方程、可视化和文本的文档。它在数据科学和经济学领域非常流行,因为它的交互式环境非常适合数据分析、模型模拟以及结果的呈现和共享。 从压缩包文件的文件名称列表“HA-model-main”可以推断,该压缩包可能包含的是一个核心的异质代理人模型项目文件,主文件可能是一个Jupyter Notebook文档,它将包含模型的代码实现、数据分析、结果展示等关键内容。 综合以上信息,可以总结出以下详细知识点: 1. 异质代理人模型(Heterogeneous Agents Model)是宏观经济学研究的重要工具,能够更真实地反映经济系统中个体差异对整体经济行为的影响。 2. 该模型常被用于研究个体行为的异质性如何影响宏观经济变量,比如GDP、失业率、通货膨胀等,并且在新凯恩斯宏观经济学框架中具有应用。 3. Julia语言由于其在数值计算和数据分析方面的卓越性能,被广泛用于建立和模拟异质代理人模型,尤其在需要进行复杂计算的经济学研究中。 4. Jupyter Notebook作为一款交互式的文档工具,允许研究者在其中整合代码、解释性文本、数学公式、图表等多种形式的信息,非常适合用来展示和分享异质代理人模型的模拟结果和分析过程。 5. 通过异质代理人模型的模拟,研究者可以更好地理解经济周期、财富分配、贫富差距等经济现象,并在此基础上制定相应的经济政策。 6. 在实际应用中,异质代理人模型通常会结合微观和宏观数据,以确保模拟结果能够尽可能地反映真实世界中的经济行为。 7. 由于异质代理人模型的复杂性,研究者需要深入掌握相应的数理统计、优化理论、数值分析等学科知识,才能准确构建和分析模型。 8. 该模型在经济学领域的应用正变得越来越广泛,对推动经济理论的创新和实际经济政策的制定都起到了积极作用。

相关推荐

filetype

regress total_revenue time intervention post intervention2 post2 intervention3 post3 Source | SS df MS Number of obs = 92 -------------+---------------------------------- F(7, 84) = 56.73 Model | 1.1913e+09 7 170187361 Prob > F = 0.0000 Residual | 251999700 84 2999996.43 R-squared = 0.8254 -------------+---------------------------------- Adj R-squared = 0.8109 Total | 1.4433e+09 91 15860563 Root MSE = 1732 ------------------------------------------------------------------------------- total_revenue | Coefficient Std. err. t P>|t| [95% conf. interval] --------------+---------------------------------------------------------------- time | 13.68935 45.29105 0.30 0.763 -76.37688 103.7556 intervention | -1200.838 1042.914 -1.15 0.253 -3274.786 873.1107 post | 384.8661 89.59558 4.30 0.000 206.6955 563.0368 intervention2 | -786.9447 1925.099 -0.41 0.684 -4615.215 3041.325 post2 | -2083.242 1890.292 -1.10 0.274 -5842.295 1675.81 intervention3 | -59.33553 924.8979 -0.06 0.949 -1898.596 1779.925 post3 | -307.4387 80.61474 -3.81 0.000 -467.75 -147.1275 _cons | 1190.531 660.2249 1.80 0.075 -122.3988 2503.46 ------------------------------------------------------------------------------- . . tsset monthnumber Time variable: monthnumber, 1 to 92 Delta: 1 unit . . estat dwatson Durbin–Watson d-statistic( 8, 92) = 1.466475 . . estat bgodfrey, lag(1/3) Breusch–Godfrey LM test for autocorrelation --------------------------------------------------------------------------- lags(p) | chi2 df Prob > chi2 -------------+------------------------------------------------------------- 1 | 6.212 1 0.0127 2 | 6.216 2 0.0447 3 | 12.260 3 0.0065 --------------------------------------------------------------------------- H0: no serial correlation . . predict e, r variable e already defined r(110); . . wntestq e, lag(1) Portmanteau test for white noise --------------------------------------- Portmanteau (Q) statistic = 6.2386 Prob > chi2(1) = 0.0125 . . actest, lags(92) Cumby-Huizinga test for autocorrelation (Breusch-Godfrey) H0: variable is MA process up to order q HA: serial correlation present at specified lags >q ----------------------------------------------------------------------------- H0: q=0 (serially uncorrelated) | H0: q=specified lag-1 HA: s.c. present at range specified | HA: s.c. present at lag specified -----------------------------------------+----------------------------------- lags | chi2 df p-val | lag | chi2 df p-val -----------+-----------------------------+-----+----------------------------- 1 - 1 | 6.212 1 0.0127 | 1 | 6.212 1 0.0127 1 - 2 | 6.216 2 0.0447 | 2 | 0.397 1 0.5285 1 - 3 | 12.260 3 0.0065 | 3 | 5.738 1 0.0166 1 - 4 | 12.313 4 0.0152 | 4 | 0.684 1 0.4082 1 - 5 | 12.785 5 0.0255 | 5 | 0.369 1 0.5436 1 - 6 | 13.695 6 0.0332 | 6 | 0.012 1 0.9137 1 - 7 | 13.709 7 0.0566 | 7 | 0.118 1 0.7315 1 - 8 | 15.952 8 0.0431 | 8 | 1.691 1 0.1934 1 - 9 | 16.599 9 0.0554 | 9 | 2.700 1 0.1003 1 - 10 | 26.773 10 0.0028 | 10 | 10.045 1 0.0015 1 - 11 | 26.777 11 0.0050 | 11 | 1.933 1 0.1645 1 - 12 | 32.096 12 0.0013 | 12 | 2.667 1 0.1024 1 - 13 | 41.132 13 0.0001 | 13 | 8.395 1 0.0038 1 - 14 | 42.300 14 0.0001 | 14 | 7.043 1 0.0080 1 - 15 | 43.214 15 0.0001 | 15 | 0.179 1 0.6722 1 - 16 | 43.224 16 0.0003 | 16 | 1.571 1 0.2100 1 - 17 | 43.377 17 0.0004 | 17 | 0.400 1* 0.5270 1 - 18 | 43.389 18 0.0007 | 18 | 0.000 1* 0.9965 1 - 19 | 43.404 19 0.0011 | 19 | 0.016 1* 0.9004 1 - 20 | 43.805 20 0.0016 | 20 | 0.030 1* 0.8622 1 - 21 | 43.823 21 0.0025 | 21 | 0.140 1* 0.7086 1 - 22 | 44.540 22 0.0030 | 22 | 0.530 1* 0.4665 1 - 23 | 45.524 23 0.0034 | 23 | 2.256 1* 0.1331 1 - 24 | 57.494 24 0.0001 | 24 | 0.292 1* 0.5888 1 - 25 | 57.530 25 0.0002 | 25 | 5.324 1* 0.0210 1 - 26 | 57.778 26 0.0003 | 26 | 0.710 1* 0.3993 1 - 27 | 63.979 27 0.0001 | 27 | 0.651 1* 0.4199 1 - 28 | 64.073 28 0.0001 | 28 | 0.655 1* 0.4182 1 - 29 | 64.746 29 0.0002 | 29 | 0.052 1* 0.8200 1 - 30 | 66.005 30 0.0002 | 30 | 0.184 1* 0.6679 1 - 31 | 67.208 31 0.0002 | 31 | 0.417 1* 0.5183 1 - 32 | 67.667 32 0.0002 | 32 | 0.397 1* 0.5288 1 - 33 | 67.667 33 0.0004 | 33 | 4.958 1* 0.0260 1 - 34 | 69.636 34 0.0003 | 34 | 1.180 1* 0.2774 1 - 35 | 69.886 35 0.0004 | 35 | 12.184 1* 0.0005 1 - 36 | 70.300 36 0.0005 | 36 | 1.640 1 0.2003 1 - 37 | 72.178 37 0.0005 | 37 | 0.050 1 0.8232 1 - 38 | 72.397 38 0.0006 | 38 | 4.127 1 0.0422 1 - 39 | 73.398 39 0.0007 | 39 | 0.006 1 0.9395 1 - 40 | 73.441 40 0.0010 | 40 | 0.061 1 0.8056 1 - 41 | 73.456 41 0.0014 | 41 | 0.016 1 0.8986 1 - 42 | 73.777 42 0.0018 | 42 | 0.337 1 0.5616 1 - 43 | 77.179 43 0.0011 | 43 | 1.222 1 0.2689 1 - 44 | 78.615 44 0.0010 | 44 | 2.095 1 0.1478 1 - 45 | 79.395 45 0.0012 | 45 | 1.213 1 0.2708 1 - 46 | 82.801 46 0.0007 | 46 | 9.541 1 0.0020 1 - 47 | 84.728 47 0.0006 | 47 | 4.054 1 0.0441 1 - 48 | 84.974 48 0.0008 | 48 | 3.043 1 0.0811 1 - 49 | 85.513 49 0.0010 | 49 | 3.186 1* 0.0743 1 - 50 | 85.526 50 0.0013 | 50 | 0.087 1* 0.7677 1 - 51 | 85.539 51 0.0017 | 51 | 0.018 1* 0.8937 1 - 52 | 86.014 52 0.0021 | 52 | 0.583 1* 0.4453 1 - 53 | 86.853 53 0.0023 | 53 | 0.774 1* 0.3790 1 - 54 | 88.402 54 0.0022 | 54 | 2.711 1* 0.0996 1 - 55 | 89.730 55 0.0022 | 55 | 2.815 1* 0.0934 1 - 56 | 89.731 56 0.0028 | 56 | 4.506 1 0.0338 1 - 57 | 89.755 57 0.0037 | 57 | 3.154 1 0.0757 1 - 58 | 90.496 58 0.0041 | 58 | 0.002 1 0.9611 1 - 59 | 90.802 59 0.0049 | 59 | 0.025 1 0.8737 1 - 60 | 90.932 60 0.0061 | 60 | 2.576 1 0.1085 1 - 61 | 90.981 61 0.0077 | 61 | 0.777 1 0.3779 1 - 62 | 91.002 62 0.0096 | 62 | 4.994 1 0.0254 1 - 63 | 91.037 63 0.0120 | 63 | 2.119 1 0.1455 1 - 64 | 91.102 64 0.0147 | 64 | 0.292 1 0.5889 1 - 65 | 91.241 65 0.0176 | 65 | 1.505 1 0.2199 1 - 66 | 91.541 66 0.0205 | 66 | 0.036 1 0.8487 1 - 67 | 91.541 67 0.0249 | 67 | 1.027 1 0.3110 1 - 68 | 91.853 68 0.0286 | 68 | 0.011 1 0.9176 1 - 69 | 91.885 69 0.0342 | 69 | 1.922 1 0.1657 1 - 70 | 91.904 70 0.0407 | 70 | 0.260 1 0.6103 1 - 71 | 91.909 71 0.0483 | 71 | 0.634 1 0.4258 1 - 72 | 91.921 72 0.0568 | 72 | 8.680 1 0.0032 1 - 73 | 91.923 73 0.0665 | 73 | 0.741 1 0.3893 1 - 74 | 91.935 74 0.0773 | 74 | 1.741 1 0.1870 1 - 75 | 91.949 75 0.0893 | 75 | 1.341 1 0.2468 1 - 76 | 91.958 76 0.1026 | 76 | 0.444 1 0.5054 1 - 77 | 91.962 77 0.1174 | 77 | 1.811 1 0.1784 1 - 78 | 91.984 78 0.1332 | 78 | 1.111 1 0.2920 1 - 79 | 91.981 79 0.1507 | 79 | 0.105 1 0.7464 1 - 80 | 91.983 80 0.1696 | 80 | 1.611 1 0.2043 1 - 81 | 91.984 81 0.1898 | 81 | 0.004 1 0.9489 1 - 82 | 91.983 82 0.2114 | 82 | 0.256 1 0.6129 1 - 83 | 91.983 83 0.2343 | 83 | 2.378 1 0.1230 1 - 84 | 91.985 84 0.2583 | 84 | 2.199 1 0.1381 1 - 85 | 91.984 85 0.2835 | 85 | 0.584 1 0.4449 1 - 86 | 91.984 86 0.3097 | 86 | 2.121 1 0.1453 1 - 87 | 91.984 87 0.3368 | 87 | 0.280 1 0.5966 1 - 88 | 91.984 88 0.3647 | 88 | 0.696 1 0.4043 1 - 89 | 91.984 89 0.3932 | 89 | 0.187 1 0.6654 1 - 90 | 91.984 90 0.4221 | 90 | 1.533 1 0.2156 1 - 91 | 91.984 91 0.4514 | 91 | 1.980 1 0.1594 1 - 92 | 91.984 92 0.4808 | 92 | 0.000 1 1.0000 ----------------------------------------------------------------------------- Test allows predetermined regressors/instruments Test requires conditional homoskedasticity * Eigenvalues adjusted to make matrix positive semidefinite . . itsa total_revenue, single trperiod(27;45;52) lag(4) replace figure Time variable: monthnumber, 1 to 92 Delta: 1 unit Iteration 0: log likelihood = -810.9219 Generalized linear models Number of obs = 92 Optimization : ML Residual df = 84 Scale parameter = 2904674 Deviance = 243992583 (1/df) Deviance = 2904674 Pearson = 243992583 (1/df) Pearson = 2904674 Variance function: V(u) = 1 [Gaussian] Link function : g(u) = u [Identity] HAC kernel (lags): Newey–West (4) AIC = 17.80265 Log likelihood = -810.9218962 BIC = 2.44e+08 -------------------------------------------------------------------------------- | HAC _total_revenue | Coefficient std. err. z P>|z| [95% conf. interval] ---------------+---------------------------------------------------------------- _t | 13.68935 2.93475 4.66 0.000 7.937344 19.44135 _x27 | -1405.277 692.9926 -2.03 0.043 -2763.517 -47.03648 _x_t27 | 408.9178 55.60065 7.35 0.000 299.9425 517.8931 _x45 | -882.9179 618.7542 -1.43 0.154 -2095.654 329.818 _x_t45 | -663.4364 85.57417 -7.75 0.000 -831.1587 -495.7141 _x52 | 2200.597 694.1669 3.17 0.002 840.0553 3561.139 _x_t52 | 331.946 70.97444 4.68 0.000 192.8387 471.0533 _cons | 1190.531 42.39444 28.08 0.000 1107.439 1273.622 --------------------------------------------------------------------------------

filetype
filetype

import pandas as pd import numpy as np import matplotlib.pyplot as plt import seaborn as sns from sklearn.model_selection import train_test_split, RepeatedKFold, cross_val_score from sklearn.ensemble import GradientBoostingRegressor, RandomForestRegressor from sklearn.svm import SVR from sklearn.tree import DecisionTreeRegressor from sklearn.gaussian_process import GaussianProcessRegressor from sklearn.gaussian_process.kernels import RBF, Matern from sklearn.metrics import mean_absolute_error, r2_score from sklearn.preprocessing import StandardScaler from sklearn.pipeline import make_pipeline from bayes_opt import BayesianOptimization import warnings # 忽略警告 warnings.filterwarnings('ignore') # 设置中文字体支持 plt.rcParams['font.sans-serif'] = ['Arial', 'Arial Unicode MS', 'Microsoft YaHei', 'sans-serif'] plt.rcParams['axes.unicode_minus'] = False sns.set_style("whitegrid") # 加载数据集 df = pd.read_csv('/Jupyter/BCC_HEA/alloy_properties.csv') features = ['mean_r', 'mean_electronegativity', 'mean_G', 'mean_vec', 'mean_delta','mean_delta_G'] targets = ['deta_E_mono','deta_E_di'] # 定义模型名称列表 model_names = ['GBR', 'SVR', 'DTR', 'RFR', 'GPR'] # 为每个目标变量进行操作 for target in targets: print(f"\n\n{'='*50}") print(f"开始处理目标变量: {target}") print(f"{'='*50}\n") # 数据拆分:训练集+验证集+测试集 X = df[features] y = df[target] # 拆分测试集(30%) X_train_full, X_test, y_train_full, y_test = train_test_split(X, y, test_size=0.3, random_state=42) # 从训练集拆分验证集(20%用于调参) X_train, X_val, y_train, y_val = train_test_split(X_train_full, y_train_full, test_size=0.2, random_state=42) # 存储模型性能 model_performance_before = {} model_performance_after = {} best_params_dict = {} # 1. 初始模型训练和评估(测试集指标为主) print(f"\n{'='*30} {target} - 原始模型性能 {'='*30}") # 定义初始模型 models = { 'GBR': GradientBoostingRegressor(random_state=42), 'SVR': make_pipeline(StandardScaler(), SVR(kernel='rbf')), 'DTR': DecisionTreeRegressor(random_state=42), 'RFR': RandomForestRegressor(random_state=42, n_jobs=-1), 'GPR': make_pipeline(StandardScaler(), GaussianProcessRegressor(random_state=42)) } # 打印原始模型的默认超参数 print(f'\n\n===== {target} - 原始模型的默认超参数 =====') for model_name, model in models.items(): print(f"\n{model_name} 的默认超参数:") default_params = model.get_params() for param, value in default_params.items(): print(f" {param}: {value}") # 训练和评估初始模型(测试集指标) for model_name in model_names: model = models[model_name] model.fit(X_train_full, y_train_full) # 完整训练集训练 # 预测 y_train_pred = model.predict(X_train_full) # 训练集预测 y_test_pred = model.predict(X_test) # 测试集预测 # 计算指标 train_mae = mean_absolute_error(y_train_full, y_train_pred) train_r2 = r2_score(y_train_full, y_train_pred) test_mae = mean_absolute_error(y_test, y_test_pred) test_r2 = r2_score(y_test, y_test_pred) # 交叉验证(可选,辅助参考) cv = RepeatedKFold(n_splits=5, n_repeats=3, random_state=42) cv_score = cross_val_score(model, X_train_full, y_train_full, cv=cv, scoring='neg_mean_absolute_error').mean() cv_mae = -cv_score # 存储性能 model_performance_before[model_name] = { 'Train MAE': train_mae, 'Train R2': train_r2, 'Test MAE': test_mae, 'Test R2': test_r2, 'CV MAE': cv_mae } # 打印结果 print(f"\n{model_name} 原始模型性能:") print(f" 训练集: MAE = {train_mae:.4f}, R² = {train_r2:.4f}") print(f" 测试集: MAE = {test_mae:.4f}, R² = {test_r2:.4f}") print(f" 交叉验证 MAE = {cv_mae:.4f}") # 绘制初始拟合图(训练+测试集) plt.figure(figsize=(8, 6)) plt.scatter(y_train_full, y_train_pred, alpha=0.7, label='training set', c='blue') plt.scatter(y_test, y_test_pred, alpha=0.7, label='test set', c='red') # 趋势线 all_y = np.concatenate([y_train_full, y_test]) all_pred = np.concatenate([y_train_pred, y_test_pred]) z = np.polyfit(all_y, all_pred, 1) p = np.poly1d(z) plt.plot(all_y, p(all_y), c='black', lw=2, linestyle='--') plt.xlabel('actual', fontsize=12) plt.ylabel('predicted', fontsize=12) plt.title(f'{target} - {model_name} (before optimization)', fontsize=14) # 标注指标 plt.text(0.05, 0.9, f'training MAE = {train_mae:.3f}, $R^2$ = {train_r2:.3f}', transform=plt.gca().transAxes, fontsize=10, bbox=dict(facecolor='white', alpha=0.8)) plt.text(0.05, 0.8, f'test set MAE = {test_mae:.3f}, $R^2$ = {test_r2:.3f}', transform=plt.gca().transAxes, fontsize=10, bbox=dict(facecolor='white', alpha=0.8)) plt.legend(fontsize=10) plt.tight_layout() plt.savefig(f'{target}_{model_name}_before_optimization.png', dpi=300) plt.show() # 2. 贝叶斯优化(验证集MAE为目标) print(f"\n{'='*30} {target} - 贝叶斯优化模型超参数 {'='*30}") # 超参数空间(保持原范围) pbounds_dict = { 'GBR': { 'n_estimators': (50, 500), 'learning_rate': (0.001, 0.3), 'max_depth': (2, 10), 'min_samples_split': (2, 20), 'min_samples_leaf': (1, 10) }, 'SVR': { 'logC': (0, 3), # C: 1-1000 'logGamma': (-3, 1), # gamma: 0.001-10 'epsilon': (0.01, 0.1) # ε范围 }, 'DTR': { 'max_depth': (2, 15), 'min_samples_split': (2, 20), 'min_samples_leaf': (1, 15), 'max_features': (0.3, 1.0) }, 'RFR': { 'n_estimators': (50, 300), 'max_depth': (3, 10), 'min_samples_split': (3, 8), 'min_samples_leaf': (1, 5), 'max_features': (0.5, 1.0) }, 'GPR': { 'kernel_type': (0, 1), # 0=RBF, 1=Matern 'log_length_scale': (-1, 2), # length_scale: 0.1-100 'log_alpha': (-3.5, -1.5), # alpha: 0.00001-1 'nu': (0.5, 2) # Matern核平滑度 } } # 目标函数:验证集MAE(最小化MAE → 最大化负MAE) def get_objective_function(model_name): def objective_function(**params): # 构建模型 if model_name == 'GBR': model = GradientBoostingRegressor( n_estimators=int(params['n_estimators']), learning_rate=params['learning_rate'], max_depth=int(params['max_depth']), min_samples_split=int(params['min_samples_split']), min_samples_leaf=int(params['min_samples_leaf']), random_state=42 ) elif model_name == 'SVR': C_val = 10 ** params['logC'] gamma_val = 10 ** params['logGamma'] model = make_pipeline( StandardScaler(), SVR(kernel='rbf', C=C_val, gamma=gamma_val, epsilon=params['epsilon']) ) elif model_name == 'DTR': model = DecisionTreeRegressor( max_depth=int(params['max_depth']), min_samples_split=int(params['min_samples_split']), min_samples_leaf=int(params['min_samples_leaf']), max_features=params['max_features'], random_state=42 ) elif model_name == 'RFR': model = RandomForestRegressor( n_estimators=int(params['n_estimators']), max_depth=int(params['max_depth']), min_samples_split=int(params['min_samples_split']), min_samples_leaf=int(params['min_samples_leaf']), max_features=params['max_features'], random_state=42, n_jobs=-1 ) elif model_name == 'GPR': kernel_type = int(round(params['kernel_type'])) length_scale = 10 ** params['log_length_scale'] alpha_val = 10 ** params['log_alpha'] if kernel_type == 0: kernel = RBF(length_scale=length_scale) else: kernel = Matern( length_scale=length_scale, nu=params['nu'] ) model = make_pipeline( StandardScaler(), GaussianProcessRegressor( kernel=kernel, alpha=alpha_val, n_restarts_optimizer=5, random_state=42 ) ) # 训练(用训练集)+ 验证(用验证集) model.fit(X_train, y_train) y_val_pred = model.predict(X_val) val_mae = mean_absolute_error(y_val, y_val_pred) return -val_mae # 最大化负MAE等价于最小化MAE return objective_function # 优化每个模型 for model_name in model_names: print(f"\n>>> 正在优化 {model_name} 模型...") optimizer = BayesianOptimization( f=get_objective_function(model_name), pbounds=pbounds_dict[model_name], random_state=42, allow_duplicate_points=True ) # 执行优化(增加迭代次数提升搜索充分性) optimizer.maximize( init_points=15, # 初始随机点从10→15 n_iter=30 # 迭代次数从20→30 ) # 获取最优参数 best_params = optimizer.max['params'] best_val_mae = -optimizer.max['target'] # 还原为MAE值 best_params_dict[model_name] = best_params print(f"\n{model_name} 最优超参数:") # 特殊参数转换显示 if model_name == 'SVR': print(f" C: {10**best_params['logC']:.4f} (logC: {best_params['logC']:.4f})") print(f" gamma: {10**best_params['logGamma']:.6f} (logGamma: {best_params['logGamma']:.4f})") print(f" epsilon: {best_params['epsilon']:.4f}") elif model_name == 'GPR': kernel_type = "RBF" if int(round(best_params['kernel_type'])) == 0 else "Matern" print(f" 核类型: {kernel_type}") print(f" length_scale: {10**best_params['log_length_scale']:.4f} (log: {best_params['log_length_scale']:.4f})") print(f" alpha: {10**best_params['log_alpha']:.6f} (log: {best_params['log_alpha']:.4f})") if kernel_type == "Matern": print(f" nu: {best_params['nu']:.4f}") else: for param, value in best_params.items(): if 'int' in str(type(value)) or param in ['n_estimators', 'max_depth', 'min_samples_split', 'min_samples_leaf']: print(f" {param}: {int(value)}") else: print(f" {param}: {value:.6f}") print(f"最优验证集 MAE: {best_val_mae:.4f}") # 3. 优化后模型评估(测试集指标为主) print(f"\n{'='*30} {target} - 优化后模型性能 {'='*30}") for model_name in model_names: best_params = best_params_dict[model_name] # 初始化优化后模型(同原逻辑) if model_name == 'GBR': optimized_model = GradientBoostingRegressor( n_estimators=int(best_params['n_estimators']), learning_rate=best_params['learning_rate'], max_depth=int(best_params['max_depth']), min_samples_split=int(best_params['min_samples_split']), min_samples_leaf=int(best_params['min_samples_leaf']), random_state=42 ) elif model_name == 'SVR': C_val = 10 ** best_params['logC'] gamma_val = 10 ** best_params['logGamma'] optimized_model = make_pipeline( StandardScaler(), SVR(kernel='rbf', C=C_val, gamma=gamma_val, epsilon=best_params['epsilon']) ) elif model_name == 'DTR': optimized_model = DecisionTreeRegressor( max_depth=int(best_params['max_depth']), min_samples_split=int(best_params['min_samples_split']), min_samples_leaf=int(best_params['min_samples_leaf']), max_features=best_params['max_features'], random_state=42 ) elif model_name == 'RFR': optimized_model = RandomForestRegressor( n_estimators=int(best_params['n_estimators']), max_depth=int(best_params['max_depth']), min_samples_split=int(best_params['min_samples_split']), min_samples_leaf=int(best_params['min_samples_leaf']), max_features=best_params['max_features'], random_state=42, n_jobs=-1 ) elif model_name == 'GPR': kernel_type = int(round(best_params['kernel_type'])) length_scale = 10 ** best_params['log_length_scale'] alpha_val = 10 ** best_params['log_alpha'] if kernel_type == 0: kernel = RBF(length_scale=length_scale) else: kernel = Matern( length_scale=length_scale, nu=best_params['nu'] ) optimized_model = make_pipeline( StandardScaler(), GaussianProcessRegressor( kernel=kernel, alpha=alpha_val, n_restarts_optimizer=5, random_state=42 ) ) # 训练:用完整训练集(训练+验证) optimized_model.fit(X_train_full, y_train_full) # 预测测试集 y_test_pred = optimized_model.predict(X_test) # 预测训练集(可选) y_train_full_pred = optimized_model.predict(X_train_full) # 计算指标 train_mae = mean_absolute_error(y_train_full, y_train_full_pred) train_r2 = r2_score(y_train_full, y_train_full_pred) test_mae = mean_absolute_error(y_test, y_test_pred) test_r2 = r2_score(y_test, y_test_pred) # 交叉验证(可选) cv = RepeatedKFold(n_splits=5, n_repeats=3, random_state=42) cv_score = cross_val_score(optimized_model, X_train_full, y_train_full, cv=cv, scoring='neg_mean_absolute_error').mean() cv_mae = -cv_score # 存储性能 model_performance_after[model_name] = { 'Train MAE': train_mae, 'Train R2': train_r2, 'Test MAE': test_mae, 'Test R2': test_r2, 'CV MAE': cv_mae } # 打印结果 print(f"\n{model_name} 优化后模型性能:") print(f" 训练集: MAE = {train_mae:.4f}, R² = {train_r2:.4f}") print(f" 测试集: MAE = {test_mae:.4f}, R² = {test_r2:.4f}") print(f" 交叉验证 MAE = {cv_mae:.4f}") # 绘制优化后拟合图(训练+测试集) plt.figure(figsize=(8, 6)) plt.scatter(y_train_full, y_train_full_pred, alpha=0.7, label='training set', c='blue') plt.scatter(y_test, y_test_pred, alpha=0.7, label='test set', c='red') # 趋势线 all_y = np.concatenate([y_train_full, y_test]) all_pred = np.concatenate([y_train_full_pred, y_test_pred]) z = np.polyfit(all_y, all_pred, 1) p = np.poly1d(z) plt.plot(all_y, p(all_y), c='black', lw=2, linestyle='--') plt.xlabel('actual', fontsize=12) plt.ylabel('predicted', fontsize=12) plt.title(f'{target} - {model_name} (after optimization)', fontsize=14) # 标注指标 plt.text(0.05, 0.9, f'training MAE = {train_mae:.3f}, $R^2$ = {train_r2:.3f}', transform=plt.gca().transAxes, fontsize=10, bbox=dict(facecolor='white', alpha=0.8)) plt.text(0.05, 0.8, f'test set MAE = {test_mae:.3f}, $R^2$ = {test_r2:.3f}', transform=plt.gca().transAxes, fontsize=10, bbox=dict(facecolor='white', alpha=0.8)) plt.legend(fontsize=10) plt.tight_layout() plt.savefig(f'{target}_{model_name}_after_optimization.png', dpi=300) plt.show() # 4. 性能对比分析(测试集指标为主) print(f"\n{'='*30} {target} - 优化前后模型性能对比 {'='*30}") # 构建对比数据 comparison_data = [] for model_name in model_names: before = model_performance_before[model_name] after = model_performance_after[model_name] # 计算提升率 mae_test_improve = (before['Test MAE'] - after['Test MAE']) / before['Test MAE'] * 100 if before['Test MAE'] != 0 else 0 r2_test_improve = (after['Test R2'] - before['Test R2']) * 100 # 百分点 comparison_data.append({ '模型': model_name, '优化前测试MAE': before['Test MAE'], '优化后测试MAE': after['Test MAE'], 'MAE提升(%)': mae_test_improve, '优化前测试R²': before['Test R2'], '优化后测试R²': after['Test R2'], 'R²提升(百分点)': r2_test_improve, '优化前交叉验证MAE': before['CV MAE'], '优化后交叉验证MAE': after['CV MAE'] }) comparison_df = pd.DataFrame(comparison_data) print("\n模型性能对比表:") print(comparison_df.round(4)) # 保存对比结果 comparison_df.to_csv(f'model_comparison_{target}.csv', index=False) # 绘制测试集MAE对比 plt.figure(figsize=(12, 8)) width = 0.35 x = np.arange(len(model_names)) plt.bar(x - width/2, comparison_df['优化前测试MAE'], width, label='before optimization', color='skyblue') plt.bar(x + width/2, comparison_df['优化后测试MAE'], width, label='after optimization', color='lightcoral') # 数据标签 for i, (before, after) in enumerate(zip(comparison_df['优化前测试MAE'], comparison_df['优化后测试MAE'])): plt.text(i - width/2, before + 0.005, f'{before:.4f}', ha='center') plt.text(i + width/2, after + 0.005, f'{after:.4f}', ha='center') # 提升百分比 improvement = (before - after) / before * 100 if before != 0 else 0 plt.text(i, max(before, after) + 0.01, f'{improvement:.1f}%', ha='center', fontsize=10, color='green' if improvement > 0 else 'red') plt.xlabel('Model', fontsize=12) plt.ylabel('test set MAE', fontsize=12) plt.title(f'{target} - before/after optimization test set MAE comparison', fontsize=14) plt.xticks(x, model_names) plt.legend() plt.tight_layout() plt.savefig(f'{target}_MAE_comparison.png', dpi=300) plt.show() # 绘制测试集R²对比 plt.figure(figsize=(12, 8)) plt.bar(x - width/2, comparison_df['优化前测试R²'], width, label='before optimization', color='skyblue') plt.bar(x + width/2, comparison_df['优化后测试R²'], width, label='after optimization', color='lightcoral') # 数据标签 for i, (before, after) in enumerate(zip(comparison_df['优化前测试R²'], comparison_df['优化后测试R²'])): plt.text(i - width/2, before + 0.01, f'{before:.4f}', ha='center') plt.text(i + width/2, after + 0.01, f'{after:.4f}', ha='center') # 提升百分点 improvement = (after - before) * 100 plt.text(i, max(before, after) + 0.02, f'+{improvement:.1f}pp' if improvement > 0 else f'{improvement:.1f}pp', ha='center', fontsize=10, color='green' if improvement > 0 else 'red') plt.xlabel('Model', fontsize=12) plt.ylabel('test set R²', fontsize=12) plt.title(f'{target} - before/after optimization test set R² comparison', fontsize=14) plt.xticks(x, model_names) plt.legend() plt.tight_layout() plt.savefig(f'{target}_R2_comparison.png', dpi=300) plt.show() # 交叉验证MAE对比(可选) all_cv_scores = [] for model_name in model_names: before_score = model_performance_before[model_name]['CV MAE'] after_score = model_performance_after[model_name]['CV MAE'] all_cv_scores.append([model_name + '_before', before_score]) all_cv_scores.append([model_name + '_after', after_score]) cv_df = pd.DataFrame(all_cv_scores, columns=['Model', 'CV MAE']) plt.figure(figsize=(12, 6)) sns.boxplot(x='Model', y='CV MAE', data=cv_df) plt.title('Cross-Validation MAE Comparison Before and After Optimization') plt.ylabel('MAE') plt.tight_layout() plt.savefig(f'{target}_cv_comparison.png', dpi=300) plt.show() print("\n所有模型优化完成!")修改上述贝叶斯优化模型前后的结果比较,不适用交叉验证

filetype

import torch import torch.nn as nn import torch.optim as optim from torch.optim.lr_scheduler import CosineAnnealingLR from torchvision import datasets, transforms from torch.utils.data import random_split, DataLoader import time import numpy as np import matplotlib.pyplot as plt from sklearn.metrics import confusion_matrix, classification_report import seaborn as sns import os import argparse from PIL import Image # 设置中文字体显示 plt.rcParams['font.sans-serif'] = ['SimHei', 'Microsoft YaHei', 'KaiTi'] plt.rcParams['axes.unicode_minus'] = False # 解决负号显示异常 # 设置随机种子,保证结果可复现 torch.manual_seed(42) np.random.seed(42) # 定义VGG网络 - 适配自定义数据集(VGG16简化版) class VGGCustom(nn.Module): def __init__(self, num_classes, input_size=(3, 224, 224)): super(VGGCustom, self).__init__() self.input_size = input_size # VGG核心:多个3x3卷积堆叠 + 最大池化 self.features = nn.Sequential( # 第一组:2个卷积层 + 池化 nn.Conv2d(3, 64, kernel_size=3, padding=1), nn.BatchNorm2d(64), nn.ReLU(inplace=True), nn.Conv2d(64, 64, kernel_size=3, padding=1), nn.BatchNorm2d(64), nn.ReLU(inplace=True), nn.MaxPool2d(kernel_size=2, stride=2), # 224x224 -> 112x112 # 第二组:2个卷积层 + 池化 nn.Conv2d(64, 128, kernel_size=3, padding=1), nn.BatchNorm2d(128), nn.ReLU(inplace=True), nn.Conv2d(128, 128, kernel_size=3, padding=1), nn.BatchNorm2d(128), nn.ReLU(inplace=True), nn.MaxPool2d(kernel_size=2, stride=2), # 112x112 -> 56x56 # 第三组:3个卷积层 + 池化 nn.Conv2d(128, 256, kernel_size=3, padding=1), nn.BatchNorm2d(256), nn.ReLU(inplace=True), nn.Conv2d(256, 256, kernel_size=3, padding=1), nn.BatchNorm2d(256), nn.ReLU(inplace=True), nn.Conv2d(256, 256, kernel_size=3, padding=1), nn.BatchNorm2d(256), nn.ReLU(inplace=True), nn.MaxPool2d(kernel_size=2, stride=2), # 56x56 -> 28x28 # 第四组:3个卷积层 + 池化 nn.Conv2d(256, 512, kernel_size=3, padding=1), nn.BatchNorm2d(512), nn.ReLU(inplace=True), nn.Conv2d(512, 512, kernel_size=3, padding=1), nn.BatchNorm2d(512), nn.ReLU(inplace=True), nn.Conv2d(512, 512, kernel_size=3, padding=1), nn.BatchNorm2d(512), nn.ReLU(inplace=True), nn.MaxPool2d(kernel_size=2, stride=2), # 28x28 -> 14x14 # 第五组:3个卷积层 + 池化 nn.Conv2d(512, 512, kernel_size=3, padding=1), nn.BatchNorm2d(512), nn.ReLU(inplace=True), nn.Conv2d(512, 512, kernel_size=3, padding=1), nn.BatchNorm2d(512), nn.ReLU(inplace=True), nn.Conv2d(512, 512, kernel_size=3, padding=1), nn.BatchNorm2d(512), nn.ReLU(inplace=True), nn.MaxPool2d(kernel_size=2, stride=2), # 14x14 -> 7x7 nn.Dropout2d(p=0.4) ) # 计算全连接层输入维度 with torch.no_grad(): dummy_input = torch.zeros(1, *input_size) features_output = self.features(dummy_input) fc_input_dim = features_output.view(1, -1).size(1) # 分类层 self.classifier = nn.Sequential( nn.Linear(fc_input_dim, 4096), nn.BatchNorm1d(4096), nn.ReLU(inplace=True), nn.Dropout(p=0.5), nn.Linear(4096, 4096), nn.BatchNorm1d(4096), nn.ReLU(inplace=True), nn.Dropout(p=0.5), nn.Linear(4096, num_classes) ) def forward(self, x): x = self.features(x) x = torch.flatten(x, 1) x = self.classifier(x) return x # 创建不同配置的VGG变体模型 def vgg_custom_basic(num_classes, input_size=(3, 224, 224)): """基础版VGG(类似VGG16),适合自定义数据集""" return VGGCustom(num_classes=num_classes, input_size=input_size) def vgg_custom_lite(num_classes, input_size=(3, 224, 224)): """轻量版VGG(减少卷积通道数),适合小数据集或显存有限的情况""" model = VGGCustom(num_classes=num_classes, input_size=input_size) # 减少卷积层通道数,降低参数量 model.features = nn.Sequential( # 第一组 nn.Conv2d(3, 32, kernel_size=3, padding=1), nn.BatchNorm2d(32), nn.ReLU(inplace=True), nn.Conv2d(32, 32, kernel_size=3, padding=1), nn.BatchNorm2d(32), nn.ReLU(inplace=True), nn.MaxPool2d(kernel_size=2, stride=2), # 第二组 nn.Conv2d(32, 64, kernel_size=3, padding=1), nn.BatchNorm2d(64), nn.ReLU(inplace=True), nn.Conv2d(64, 64, kernel_size=3, padding=1), nn.BatchNorm2d(64), nn.ReLU(inplace=True), nn.MaxPool2d(kernel_size=2, stride=2), # 第三组 nn.Conv2d(64, 128, kernel_size=3, padding=1), nn.BatchNorm2d(128), nn.ReLU(inplace=True), nn.Conv2d(128, 128, kernel_size=3, padding=1), nn.BatchNorm2d(128), nn.ReLU(inplace=True), nn.Conv2d(128, 128, kernel_size=3, padding=1), nn.BatchNorm2d(128), nn.ReLU(inplace=True), nn.MaxPool2d(kernel_size=2, stride=2), # 第四组 nn.Conv2d(128, 256, kernel_size=3, padding=1), nn.BatchNorm2d(256), nn.ReLU(inplace=True), nn.Conv2d(256, 256, kernel_size=3, padding=1), nn.BatchNorm2d(256), nn.ReLU(inplace=True), nn.Conv2d(256, 256, kernel_size=3, padding=1), nn.BatchNorm2d(256), nn.ReLU(inplace=True), nn.MaxPool2d(kernel_size=2, stride=2), # 第五组 nn.Conv2d(256, 256, kernel_size=3, padding=1), nn.BatchNorm2d(256), nn.ReLU(inplace=True), nn.Conv2d(256, 256, kernel_size=3, padding=1), nn.BatchNorm2d(256), nn.ReLU(inplace=True), nn.Conv2d(256, 256, kernel_size=3, padding=1), nn.BatchNorm2d(256), nn.ReLU(inplace=True), nn.MaxPool2d(kernel_size=2, stride=2), nn.Dropout2d(p=0.4) ) # 重新计算全连接层输入维度 with torch.no_grad(): dummy_input = torch.zeros(1, *input_size) features_output = model.features(dummy_input) fc_input_dim = features_output.view(1, -1).size(1) # 调整分类层 model.classifier = nn.Sequential( nn.Linear(fc_input_dim, 2048), # 减少全连接层维度 nn.BatchNorm1d(2048), nn.ReLU(inplace=True), nn.Dropout(p=0.5), nn.Linear(2048, 1024), nn.BatchNorm1d(1024), nn.ReLU(inplace=True), nn.Dropout(p=0.5), nn.Linear(1024, num_classes) ) return model # 数据加载与预处理 - 适配自定义数据集 def load_data(data_path, input_size=(224, 224), train_ratio=0.7, val_ratio=0.2): mean = (0.485, 0.456, 0.406) # ImageNet均值 std = (0.229, 0.224, 0.225) # ImageNet标准差 train_transform = transforms.Compose([ transforms.Resize(input_size), transforms.RandomCrop(input_size, padding=16), transforms.RandomHorizontalFlip(p=0.5), transforms.RandomRotation(15), transforms.ColorJitter(brightness=0.2, contrast=0.2, saturation=0.2, hue=0.1), transforms.ToTensor(), transforms.Normalize(mean, std), transforms.RandomErasing(p=0.3, scale=(0.02, 0.25)), ]) val_test_transform = transforms.Compose([ transforms.Resize(input_size), transforms.ToTensor(), transforms.Normalize(mean, std), ]) full_dataset = datasets.ImageFolder(root=data_path, transform=train_transform) class_names = full_dataset.classes num_classes = len(class_names) print(f"已加载自定义数据集,共 {num_classes} 个类别: {class_names}") # 划分数据集 total_size = len(full_dataset) train_size = int(train_ratio * total_size) val_size = int(val_ratio * total_size) test_size = total_size - train_size - val_size train_dataset, val_dataset, test_dataset = random_split( full_dataset, [train_size, val_size, test_size], generator=torch.Generator().manual_seed(42) ) # 替换验证集和测试集的transform val_dataset.dataset.transform = val_test_transform test_dataset.dataset.transform = val_test_transform # 创建数据加载器(VGG参数量较大,可适当减小batch_size) train_loader = DataLoader( train_dataset, batch_size=16, shuffle=True, num_workers=2, pin_memory=True ) val_loader = DataLoader( val_dataset, batch_size=16, shuffle=False, num_workers=2, pin_memory=True ) test_loader = DataLoader( test_dataset, batch_size=16, shuffle=False, num_workers=2, pin_memory=True ) return train_loader, val_loader, test_loader, class_names, num_classes, mean, std # 训练模型(保持原有流程,适配VGG) def train_model(model, train_loader, val_loader, criterion, optimizer, scheduler, device, epochs=100, start_epoch=0, best_val_acc=0.0): model.train() train_losses, train_accs, val_losses, val_accs = [], [], [], [] if start_epoch > 0: try: history = torch.load(f'models/training_history_{type(model).__name__}_custom.pth') train_losses, train_accs, val_losses, val_accs = history['train_losses'], history['train_accs'], history[ 'val_losses'], history['val_accs'] print(f"已加载训练历史,从第 {start_epoch+1} 轮开始训练") except: print("无法加载训练历史,将重新记录") train_losses = [0] * start_epoch train_accs = [0] * start_epoch val_losses = [0] * start_epoch val_accs = [0] * start_epoch for epoch in range(start_epoch, epochs): running_loss, correct, total = 0.0, 0, 0 start_time = time.time() model.train() for inputs, targets in train_loader: inputs, targets = inputs.to(device), targets.to(device) optimizer.zero_grad() outputs = model(inputs) loss = criterion(outputs, targets) loss.backward() optimizer.step() running_loss += loss.item() _, predicted = outputs.max(1) total += targets.size(0) correct += predicted.eq(targets).sum().item() epoch_train_loss = running_loss / len(train_loader) epoch_train_acc = 100.0 * correct / total train_losses.append(epoch_train_loss) train_accs.append(epoch_train_acc) val_loss, val_acc = evaluate_on_val(model, val_loader, criterion, device) val_losses.append(val_loss) val_accs.append(val_acc) scheduler.step() end_time = time.time() if val_acc > best_val_acc: best_val_acc = val_acc torch.save(model, f'models/best_{type(model).__name__}_custom.pth') print(f"已保存最佳模型,验证准确率: {val_acc:.2f}%") save_checkpoint({ 'epoch': epoch + 1, 'model_state_dict': model.state_dict(), 'optimizer_state_dict': optimizer.state_dict(), 'scheduler_state_dict': scheduler.state_dict(), 'best_val_acc': best_val_acc, }, f'models/checkpoint_{type(model).__name__}_custom.pth') torch.save({ 'train_losses': train_losses, 'train_accs': train_accs, 'val_losses': val_losses, 'val_accs': val_accs, }, f'models/training_history_{type(model).__name__}_custom.pth') print( f'Epoch {epoch + 1}/{epochs}, ' f'Train Loss: {epoch_train_loss:.4f}, Train Acc: {epoch_train_acc:.2f}%, ' f'Val Loss: {val_loss:.4f}, Val Acc: {val_acc:.2f}%, ' f'Time: {end_time - start_time:.2f}s' ) return train_losses, train_accs, val_losses, val_accs # 验证集评估 def evaluate_on_val(model, val_loader, criterion, device): model.eval() val_loss, correct, total = 0, 0, 0 with torch.no_grad(): for inputs, targets in val_loader: inputs, targets = inputs.to(device), targets.to(device) outputs = model(inputs) loss = criterion(outputs, targets) val_loss += loss.item() _, predicted = outputs.max(1) total += targets.size(0) correct += predicted.eq(targets).sum().item() return val_loss / len(val_loader), 100.0 * correct / total # 测试集评估(增强可视化功能) def evaluate_model(model, test_loader, device, class_names, mean, std): model.eval() test_loss = 0 correct = 0 total = 0 all_targets = [] all_predicted = [] all_inputs = [] # 保存图像用于定性可视化 criterion = nn.CrossEntropyLoss() with torch.no_grad(): for inputs, targets in test_loader: all_inputs.append(inputs) # 收集输入图像 inputs, targets = inputs.to(device), targets.to(device) outputs = model(inputs) loss = criterion(outputs, targets) test_loss += loss.item() _, predicted = outputs.max(1) total += targets.size(0) correct += predicted.eq(targets).sum().item() all_targets.extend(targets.cpu().numpy()) all_predicted.extend(predicted.cpu().numpy()) test_loss /= len(test_loader) test_acc = 100.0 * correct / total print(f'Test Loss: {test_loss:.4f}, Test Acc: {test_acc:.2f}%') # 计算混淆矩阵和分类报告 cm = confusion_matrix(all_targets, all_predicted) report = classification_report( all_targets, all_predicted, target_names=class_names, labels=list(range(len(class_names))), output_dict=True ) print("分类报告:") print(classification_report( all_targets, all_predicted, target_names=class_names, labels=list(range(len(class_names))) )) # 定量可视化:性能指标图表 plot_quantitative_metrics(report, class_names, model) # 定性可视化:预测案例展示 all_inputs = torch.cat(all_inputs, dim=0) visualize_predictions(all_inputs, all_targets, all_predicted, class_names, mean, std, model) return test_loss, test_acc, cm, class_names # 定量可视化:各类别性能指标对比 def plot_quantitative_metrics(report, class_names, model): metrics = ['precision', 'recall', 'f1-score'] data = {m: [report[cls][m] for cls in class_names] for m in metrics} # 绘制类别性能对比图 x = np.arange(len(class_names)) width = 0.25 fig, ax = plt.subplots(figsize=(14, 6)) rects1 = ax.bar(x - width, data['precision'], width, label='精确率') rects2 = ax.bar(x, data['recall'], width, label='召回率') rects3 = ax.bar(x + width, data['f1-score'], width, label='F1分数') ax.set_xlabel('类别') ax.set_ylabel('分数') ax.set_title(f'{type(model).__name__} 各类别性能指标') ax.set_xticks(x) ax.set_xticklabels(class_names, rotation=45) ax.legend() # 在柱状图上标注数值 def autolabel(rects): for rect in rects: height = rect.get_height() ax.annotate(f'{height:.2f}', xy=(rect.get_x() + rect.get_width() / 2, height), xytext=(0, 3), textcoords="offset points", ha='center', va='bottom') autolabel(rects1) autolabel(rects2) autolabel(rects3) plt.tight_layout() plt.savefig(f'class_metrics_{type(model).__name__}_custom.png') plt.show() # 绘制总体性能指标 overall = report['weighted avg'] fig, ax = plt.subplots(figsize=(8, 5)) ax.bar(metrics, [overall[m] for m in metrics], color=['#4CAF50', '#2196F3', '#FFC107']) ax.set_ylim(0, 1.0) ax.set_title(f'{type(model).__name__} 总体性能指标') for i, v in enumerate([overall[m] for m in metrics]): ax.text(i, v + 0.02, f'{v:.2f}', ha='center') plt.savefig(f'overall_metrics_{type(model).__name__}_custom.png') plt.show() # 定性可视化:展示正确和错误预测的案例 def visualize_predictions(images, targets, predictions, class_names, mean, std, model): def denormalize(tensor): tensor = tensor.clone() for t, m, s in zip(tensor, mean, std): t.mul_(s).add_(m) return tensor.clamp(0, 1) # 筛选正确和错误的预测索引 correct_indices = [i for i in range(len(targets)) if targets[i] == predictions[i]] incorrect_indices = [i for i in range(len(targets)) if targets[i] != predictions[i]] # 各选10个样本 num_samples = 10 correct_samples = correct_indices[:num_samples] incorrect_samples = incorrect_indices[:num_samples] # 展示正确预测的案例 if correct_samples: fig, axes = plt.subplots(2, 5, figsize=(15, 6)) fig.suptitle(f'{type(model).__name__} - 正确预测案例', fontsize=16) axes = axes.flatten() for i, idx in enumerate(correct_samples): ax = axes[i] img = denormalize(images[idx]).permute(1, 2, 0).numpy() ax.imshow(img) ax.set_title(f"真实: {class_names[targets[idx]]}\n预测: {class_names[predictions[idx]]}") ax.axis('off') plt.tight_layout(rect=[0, 0, 1, 0.95]) plt.savefig(f'correct_predictions_{type(model).__name__}_custom.png') plt.show() # 展示错误预测的案例 if incorrect_samples: fig, axes = plt.subplots(2, 5, figsize=(15, 6)) fig.suptitle(f'{type(model).__name__} - 错误预测案例', fontsize=16) axes = axes.flatten() for i, idx in enumerate(incorrect_samples): ax = axes[i] img = denormalize(images[idx]).permute(1, 2, 0).numpy() ax.imshow(img) ax.set_title(f"真实: {class_names[targets[idx]]}\n预测: {class_names[predictions[idx]]}") ax.axis('off') plt.tight_layout(rect=[0, 0, 1, 0.95]) plt.savefig(f'incorrect_predictions_{type(model).__name__}_custom.png') plt.show() # 保存/加载检查点 def save_checkpoint(state, filename): torch.save(state, filename) def load_checkpoint(model, optimizer, scheduler, filename): try: checkpoint = torch.load(filename) model.load_state_dict(checkpoint['model_state_dict']) optimizer.load_state_dict(checkpoint['optimizer_state_dict']) scheduler.load_state_dict(checkpoint['scheduler_state_dict']) return model, optimizer, scheduler, checkpoint['epoch'], checkpoint['best_val_acc'] except: print("无法加载检查点,将从头开始训练") return model, optimizer, scheduler, 0, 0.0 # 模型统计信息 def count_parameters(model): return sum(p.numel() for p in model.parameters() if p.requires_grad) def count_flops(model, input_size=(3, 224, 224)): device = next(model.parameters()).device model.eval() x = torch.rand(1, *input_size).to(device) try: from thop import profile flops, _ = profile(model, inputs=(x,)) return flops / 1e9 # VGG参数量大,用G为单位 except ImportError: print("请安装thop库计算FLOPs: pip install thop") return None # 主函数 def main(): parser = argparse.ArgumentParser(description='VGG用于自定义数据集图像识别') parser.add_argument('--data_path', type=str, required=True, help='自定义数据集根目录路径') parser.add_argument('--model', type=str, default='vgg_custom_basic', choices=['vgg_custom_basic', 'vgg_custom_lite']) # 新增VGG模型选项 parser.add_argument('--epochs', type=int, default=100) parser.add_argument('--lr', type=float, default=0.0005) # VGG学习率可适当减小 parser.add_argument('--input_size', type=int, nargs=2, default=[224, 224], help='输入图像尺寸 (width height)') parser.add_argument('--resume', action='store_true') args = parser.parse_args() device = torch.device("cuda" if torch.cuda.is_available() else "cpu") print(f"使用设备: {device}") os.makedirs('models', exist_ok=True) # 加载数据 train_loader, val_loader, test_loader, class_names, num_classes, mean, std = load_data( data_path=args.data_path, input_size=tuple(args.input_size), train_ratio=0.7, val_ratio=0.2 ) print( f"数据划分: 训练集 {len(train_loader.dataset)} 样本, 验证集 {len(val_loader.dataset)} 样本, 测试集 {len(test_loader.dataset)} 样本") # 创建VGG模型 input_size = (3, args.input_size[0], args.input_size[1]) model_dict = { 'vgg_custom_basic': vgg_custom_basic, 'vgg_custom_lite': vgg_custom_lite, } model = model_dict[args.model](num_classes=num_classes, input_size=input_size).to(device) print(f"使用模型: {args.model},输入尺寸: {input_size}") # 定义训练组件(VGG适合用SGD+动量) criterion = nn.CrossEntropyLoss(label_smoothing=0.1) optimizer = optim.SGD(model.parameters(), lr=args.lr, momentum=0.9, weight_decay=5e-4) # VGG常用SGD优化器 scheduler = CosineAnnealingLR(optimizer, T_max=args.epochs, eta_min=1e-6) # 模型统计信息(VGG参数量远大于AlexNet) params = count_parameters(model) flops = count_flops(model, input_size=input_size) print(f"模型参数量: {params / 1e6:.2f} M") print(f"模型FLOPs: {flops:.2f} G" if flops else "未计算FLOPs") # 从检查点续训 start_epoch, best_val_acc = 0, 0.0 if args.resume: model, optimizer, scheduler, start_epoch, best_val_acc = load_checkpoint( model, optimizer, scheduler, f'models/checkpoint_{type(model).__name__}_custom.pth' ) # 训练模型 print("开始训练模型...") train_losses, train_accs, val_losses, val_accs = train_model( model, train_loader, val_loader, criterion, optimizer, scheduler, device, args.epochs, start_epoch, best_val_acc ) # 评估模型 best_model = torch.load(f'models/best_{type(model).__name__}_custom.pth') best_model.to(device) print("开始评估模型...") test_loss, test_acc, cm, class_names = evaluate_model(best_model, test_loader, device, class_names, mean, std) # 保存最终模型 torch.save(model, f'models/{args.model}_custom_final.pth') print(f"最终模型已保存至 models/{args.model}_custom_final.pth") # 训练曲线可视化 plt.figure(figsize=(14, 6)) plt.subplot(1, 2, 1) plt.plot(train_losses, label='训练损失') plt.plot(val_losses, label='验证损失') plt.xlabel('轮数') plt.ylabel('损失') plt.legend() plt.title(f'{args.model}损失曲线') plt.subplot(1, 2, 2) plt.plot(train_accs, label='训练准确率') plt.plot(val_accs, label='验证准确率') plt.xlabel("轮数") plt.ylabel('准确率 (%)') plt.legend() plt.title(f'{args.model}准确率曲线') plt.tight_layout() plt.savefig(f'training_curves_{args.model}_custom.png') plt.show() # 混淆矩阵可视化 plt.figure(figsize=(10, 10)) sns.heatmap(cm, annot=True, fmt='d', cmap='Blues', xticklabels=class_names, yticklabels=class_names) plt.xlabel('预测标签') plt.ylabel('实际标签') plt.title(f'{args.model}混淆矩阵') plt.tight_layout() plt.savefig(f'confusion_matrix_{args.model}_custom.png') plt.show() if __name__ == "__main__": main() 分析上面的代码

filetype

``` from bertopic import BERTopic import numpy as np import pandas as pd from umap import UMAP from hdbscan import HDBSCAN from sklearn.feature_extraction.text import CountVectorizer from bertopic.vectorizers import ClassTfidfTransformer import re import nltk from nltk.corpus import stopwords from nltk.stem import WordNetLemmatizer from sklearn.feature_extraction.text import ENGLISH_STOP_WORDS from nltk.tokenize import word_tokenize from wordcloud import WordCloud import matplotlib.pyplot as plt # 加载原始文本数据(仍需用于主题表示) df = pd.read_csv('tokenized_abstract.csv', encoding='utf-8') sentences = df['Tokenized_Abstract'].tolist() print('文本条数: ', len(sentences)) print('预览第一条: ', sentences[0]) # 检查缺失值 print("缺失值数量:", df['Tokenized_Abstract'].isna().sum()) # 检查非字符串类型 non_str_mask = df['Tokenized_Abstract'].apply(lambda x: not isinstance(x, str)) print("非字符串样本:\n", df[non_str_mask]['Tokenized_Abstract'].head()) vectorizer_model = None from sentence_transformers import SentenceTransformer # 加载时间数据 df['Date'] = pd.to_datetime(df['Date']) # 从Date列提取年份 years = df['Date'].dt.year print(years) # Step 1 - Extract embeddings embedding_model = SentenceTransformer("C:\\Users\\18267\\.cache\\huggingface\\hub\\models--sentence-transformers--all-mpnet-base-v2\\snapshots\\9a3225965996d404b775526de6dbfe85d3368642") embeddings = np.load('clean_emb_last.npy') print(f"嵌入的形状: {embeddings.shape}") # Step 2 - Reduce dimensionality umap_model = UMAP(n_neighbors=7, n_components=10, min_dist=0.0, metric='cosine',random_state=42) # Step 3 - Cluster reduced embeddings hdbscan_model = HDBSCAN(min_samples=7, min_cluster_size=60,metric='euclidean', cluster_selection_method='eom', prediction_data=True) # Step 4 - Tokenize topics # Combine custom stop words with scikit-learn's English stop words custom_stop_words = ['h2', 'storing', 'storage', 'include', 'comprise', 'utility', 'model', 'disclosed', 'embodiment', 'invention', 'prior', 'art', 'according', 'present', 'method', 'system', 'device', 'may', 'also', 'use', 'used', 'provide', 'wherein', 'configured', 'predetermined', 'plurality', 'comprising', 'consists', 'following', 'characterized', 'claim', 'claims', 'said', 'first', 'second', 'third', 'fourth', 'fifth', 'one', 'two', 'three','hydrogen'] # Create combined stop words set all_stop_words = set(custom_stop_words).union(ENGLISH_STOP_WORDS) vectorizer_model = CountVectorizer(stop_words=list(all_stop_words)) # Step 5 - Create topic representation ctfidf_model = ClassTfidfTransformer() # All steps together topic_model = BERTopic( embedding_model=embedding_model, # Step 1 - Extract embeddings umap_model=umap_model, # Step 2 - Reduce dimensionality hdbscan_model=hdbscan_model, # Step 3 - Cluster reduced embeddings vectorizer_model=vectorizer_model, # Step 4 - Tokenize topics ctfidf_model=ctfidf_model, # Step 5 - Extract topic words top_n_words=50 ) # 拟合模型 topics, probs = topic_model.fit_transform(documents=sentences, # 仍需提供文档用于主题词生成 embeddings=embeddings # 注入预计算嵌入) ) # 获取主题聚类信息 topic_info = topic_model.get_topic_info() print(topic_info)```使用BERTopic的动态主题功能,传入时间戳参数,并在拟合后分析主题随时间的变化。设置时间戳 t1=2000-2010 年,t2=2011-2018 年,t3=2019-2024 年。最终,将当前阶段和前一阶段的 c-TF-IDF 平均值作为当前阶段的权重分数,取权重分数前 15 的单词作为动态主题的关键词,形成动态主题词列表。在动态主题方面,分别选取主题在每个阶段改进 c-TF-IDF 值排名前 15 的单词作为关键词绘制演化图分析主题在不同阶段的动态变化。请你提供方案实现上述操作。

filetype

import numpy as np import matplotlib.pyplot as plt import pandas as pd import tkinter as tk from tkinter import ttk, filedialog, messagebox from PIL import Image, ImageDraw import cv2 import os import csv from sklearn.datasets import load_digits from sklearn.model_selection import train_test_split from sklearn.svm import SVC from sklearn.tree import DecisionTreeClassifier from sklearn.ensemble import RandomForestClassifier from sklearn.neural_network import MLPClassifier from sklearn.neighbors import KNeighborsClassifier from sklearn.naive_bayes import GaussianNB from sklearn.metrics import accuracy_score from sklearn.preprocessing import StandardScaler # 设置中文字体和负号显示 plt.rcParams["font.family"] = ["SimHei", "Microsoft YaHei"] plt.rcParams["axes.unicode_minus"] = False # 尝试导入XGBoost和LightGBM XGB_INSTALLED = False LGB_INSTALLED = False try: import xgboost as xgb XGB_INSTALLED = True except ImportError: print("警告: 未安装XGBoost库,无法使用XGBoost模型") try: import lightgbm as lgb LGB_INSTALLED = True except ImportError: print("警告: 未安装LightGBM库,无法使用LightGBM模型") # 定义模型元数据常量(优化参数) MODEL_METADATA = { 'svm': ('支持向量机(SVM)', SVC, StandardScaler, {'probability': True, 'random_state': 42}), 'dt': ('决策树(DT)', DecisionTreeClassifier, None, {'random_state': 42}), 'rf': ('随机森林(RF)', RandomForestClassifier, None, {'n_estimators': 100, 'random_state': 42}), 'mlp': ('多层感知机(MLP)', MLPClassifier, StandardScaler, {'hidden_layer_sizes': (100, 50), 'max_iter': 500, 'random_state': 42}), 'knn': ('K最近邻(KNN)', KNeighborsClassifier, StandardScaler, {'n_neighbors': 5, 'weights': 'distance'}), 'nb': ('高斯朴素贝叶斯(NB)', GaussianNB, None, {}), } # 添加可选模型 if XGB_INSTALLED: MODEL_METADATA['xgb'] = ('XGBoost(XGB)', xgb.XGBClassifier, None, {'objective': 'multi:softmax', 'random_state': 42}) if LGB_INSTALLED: MODEL_METADATA['lgb'] = ('LightGBM(LGB)', lgb.LGBMClassifier, None, { 'objective': 'multiclass', 'random_state': 42, 'num_class': 10, 'max_depth': 5, 'min_child_samples': 10, 'learning_rate': 0.1, 'force_col_wise': True }) class ModelFactory: @staticmethod def get_split_data(digits_dataset): """数据集划分""" X, y = digits_dataset.data, digits_dataset.target return train_test_split(X, y, test_size=0.3, random_state=42) @classmethod def create_model(cls, model_type): """创建模型和数据标准化器""" if model_type not in MODEL_METADATA: raise ValueError(f"未知模型类型: {model_type}") name, model_cls, scaler_cls, params = MODEL_METADATA[model_type] if not model_cls: raise ImportError(f"{name}模型依赖库未安装") model = model_cls(**params) scaler = scaler_cls() if scaler_cls else None return model, scaler @staticmethod def train_model(model, X_train, y_train, scaler=None, model_type=None): """训练模型""" if scaler: X_train = scaler.fit_transform(X_train) if model_type == 'lgb' and isinstance(X_train, np.ndarray): X_train = pd.DataFrame(X_train) model.fit(X_train, y_train) return model @staticmethod def evaluate_model(model, X_test, y_test, scaler=None, model_type=None): """评估模型""" if scaler: X_test = scaler.transform(X_test) if model_type == 'lgb' and isinstance(X_test, np.ndarray) and hasattr(model, 'feature_name_'): X_test = pd.DataFrame(X_test, columns=model.feature_name_) y_pred = model.predict(X_test) return accuracy_score(y_test, y_pred) @classmethod def train_and_evaluate(cls, model_type, X_train, y_train, X_test, y_test): """训练并评估模型""" try: model, scaler = cls.create_model(model_type) model = cls.train_model(model, X_train, y_train, scaler, model_type) accuracy = cls.evaluate_model(model, X_test, y_test, scaler, model_type) return model, scaler, accuracy except Exception as e: print(f"模型 {model_type} 训练/评估错误: {str(e)}") raise @classmethod def evaluate_all_models(cls, digits_dataset): """评估所有可用模型""" print("\n=== 模型评估 ===") X_train, X_test, y_train, y_test = cls.get_split_data(digits_dataset) results = [] for model_type in MODEL_METADATA: name = MODEL_METADATA[model_type][0] print(f"评估模型: {name} ({model_type})") if not MODEL_METADATA[model_type][1]: results.append({"模型名称": name, "准确率": "N/A"}) continue try: _, _, accuracy = cls.train_and_evaluate( model_type, X_train, y_train, X_test, y_test ) results.append({"模型名称": name, "准确率": f"{accuracy:.4f}"}) except Exception as e: results.append({"模型名称": name, "准确率": f"错误: {str(e)}"}) # 按准确率排序 results.sort( key=lambda x: float(x["准确率"]) if isinstance(x["准确率"], str) and x["准确率"].replace('.', '', 1).isdigit() else -1, reverse=True ) print(pd.DataFrame(results)) return results class HandwritingBoard: CANVAS_SIZE = 300 # 固定画布尺寸 BRUSH_SIZE = 12 # 画笔大小 def __init__(self, root, model_factory, digits): self.root = root self.root.title("手写数字识别系统") self.root.geometry("1000x700") # 增加窗口尺寸以容纳所有组件 self.model_factory = model_factory self.digits = digits self.model_cache = {} self.current_model = None self.scaler = None self.current_model_type = None self.has_drawn = False self.custom_data = [] self.drawing = False self.last_x = self.last_y = 0 # 自定义数据目录 self.data_dir = "custom_digits_data" os.makedirs(self.data_dir, exist_ok=True) # 初始化画布 self.image = Image.new("L", (self.CANVAS_SIZE, self.CANVAS_SIZE), 255) self.draw_obj = ImageDraw.Draw(self.image) self.create_widgets() self.init_default_model() def create_widgets(self): """使用grid布局管理器创建界面组件""" # 创建主框架 main_frame = tk.Frame(self.root) main_frame.pack(fill=tk.BOTH, expand=True, padx=10, pady=10) # 使用grid布局管理器 # 第一行:模型选择区域 model_frame = tk.LabelFrame(main_frame, text="模型选择", font=("Arial", 10, "bold")) model_frame.grid(row=0, column=0, columnspan=2, sticky="ew", padx=5, pady=5) model_frame.grid_columnconfigure(1, weight=1) # 让模型标签可以扩展 tk.Label(model_frame, text="选择模型:", font=("Arial", 10)).grid(row=0, column=0, padx=5, pady=5, sticky="w") self.available_models = [] for model_type, (name, _, _, _) in MODEL_METADATA.items(): if MODEL_METADATA[model_type][1]: self.available_models.append((model_type, name)) self.model_var = tk.StringVar() self.model_combobox = ttk.Combobox( model_frame, textvariable=self.model_var, values=[name for _, name in self.available_models], state="readonly", width=25, font=("Arial", 10) ) self.model_combobox.current(0) self.model_combobox.bind("<<ComboboxSelected>>", self.on_model_select) self.model_combobox.grid(row=0, column=1, padx=5, pady=5, sticky="ew") self.model_label = tk.Label( model_frame, text="", font=("Arial", 10), relief=tk.SUNKEN, padx=5, pady=2 ) self.model_label.grid(row=0, column=2, padx=5, pady=5, sticky="ew") # 第二行:左侧绘图区域和右侧结果区域 # 左侧绘图区域 left_frame = tk.LabelFrame(main_frame, text="绘制区域", font=("Arial", 10, "bold")) left_frame.grid(row=1, column=0, padx=5, pady=5, sticky="nsew") self.canvas = tk.Canvas(left_frame, bg="white", width=self.CANVAS_SIZE, height=self.CANVAS_SIZE) self.canvas.pack(padx=10, pady=10) self.canvas.bind("<Button-1>", self.start_draw) self.canvas.bind("<B1-Motion>", self.draw) self.canvas.bind("<ButtonRelease-1>", self.stop_draw) # 添加绘制提示 self.canvas.create_text( self.CANVAS_SIZE / 2, self.CANVAS_SIZE / 2, text="绘制数字", fill="gray", font=("Arial", 16) ) # 绘图控制按钮 btn_frame = tk.Frame(left_frame) btn_frame.pack(fill=tk.X, pady=(0, 10)) tk.Button(btn_frame, text="识别", command=self.recognize, width=8).pack(side=tk.LEFT, padx=5) tk.Button(btn_frame, text="清除", command=self.clear_canvas, width=8).pack(side=tk.LEFT, padx=5) tk.Button(btn_frame, text="样本", command=self.show_samples, width=8).pack(side=tk.LEFT, padx=5) # 右侧结果区域 right_frame = tk.Frame(main_frame) right_frame.grid(row=1, column=1, padx=5, pady=5, sticky="nsew") # 识别结果 result_frame = tk.LabelFrame(right_frame, text="识别结果", font=("Arial", 10, "bold")) result_frame.pack(fill=tk.X, padx=5, pady=5) self.result_label = tk.Label( result_frame, text="请绘制数字", font=("Arial", 24), pady=10 ) self.result_label.pack() self.prob_label = tk.Label( result_frame, text="", font=("Arial", 12) ) self.prob_label.pack() # 置信度可视化 confidence_frame = tk.LabelFrame(right_frame, text="识别置信度", font=("Arial", 10, "bold")) confidence_frame.pack(fill=tk.X, padx=5, pady=5) self.confidence_canvas = tk.Canvas( confidence_frame, bg="white", height=50 ) self.confidence_canvas.pack(fill=tk.X, padx=10, pady=10) self.confidence_canvas.create_text( 150, 25, text="识别后显示置信度", fill="gray", font=("Arial", 10) ) # 候选数字 candidates_frame = tk.LabelFrame(right_frame, text="可能的数字", font=("Arial", 10, "bold")) candidates_frame.pack(fill=tk.X, padx=5, pady=5) columns = ("数字", "概率") self.candidates_tree = ttk.Treeview( candidates_frame, columns=columns, show="headings", height=4 ) for col in columns: self.candidates_tree.heading(col, text=col) self.candidates_tree.column(col, width=80, anchor=tk.CENTER) scrollbar = ttk.Scrollbar( candidates_frame, orient=tk.VERTICAL, command=self.candidates_tree.yview ) self.candidates_tree.configure(yscroll=scrollbar.set) self.candidates_tree.pack(side=tk.LEFT, fill=tk.BOTH, expand=True, padx=5, pady=5) scrollbar.pack(side=tk.RIGHT, fill=tk.Y, padx=5, pady=5) # 第三行:模型性能对比和训练集管理 # 模型性能对比 performance_frame = tk.LabelFrame(main_frame, text="模型性能对比", font=("Arial", 10, "bold")) performance_frame.grid(row=2, column=0, padx=5, pady=5, sticky="nsew") columns = ("模型名称", "准确率") self.performance_tree = ttk.Treeview( performance_frame, columns=columns, show="headings", height=8 ) for col in columns: self.performance_tree.heading(col, text=col) self.performance_tree.column(col, width=120, anchor=tk.CENTER) scrollbar = ttk.Scrollbar( performance_frame, orient=tk.VERTICAL, command=self.performance_tree.yview ) self.performance_tree.configure(yscroll=scrollbar.set) self.performance_tree.pack(side=tk.LEFT, fill=tk.BOTH, expand=True, padx=5, pady=5) scrollbar.pack(side=tk.RIGHT, fill=tk.Y, padx=5, pady=5) # 训练集管理 train_frame = tk.LabelFrame(main_frame, text="训练集管理", font=("Arial", 10, "bold")) train_frame.grid(row=2, column=1, padx=5, pady=5, sticky="nsew") # 使用grid布局训练集管理按钮 tk.Button( train_frame, text="保存为训练样本", command=self.save_as_training_sample, width=18, height=2 ).grid(row=0, column=0, padx=5, pady=5, sticky="ew") tk.Button( train_frame, text="保存全部训练集", command=self.save_all_training_data, width=18, height=2 ).grid(row=0, column=1, padx=5, pady=5, sticky="ew") tk.Button( train_frame, text="加载训练集", command=self.load_training_data, width=18, height=2 ).grid(row=1, column=0, padx=5, pady=5, sticky="ew") tk.Button( train_frame, text="性能图表", command=self.show_performance_chart, width=18, height=2 ).grid(row=1, column=1, padx=5, pady=5, sticky="ew") # 状态信息 self.status_var = tk.StringVar(value="就绪") status_bar = tk.Label( self.root, textvariable=self.status_var, bd=1, relief=tk.SUNKEN, anchor=tk.W, font=("Arial", 10) ) status_bar.pack(side=tk.BOTTOM, fill=tk.X) # 配置权重 main_frame.grid_columnconfigure(0, weight=1) main_frame.grid_columnconfigure(1, weight=1) main_frame.grid_rowconfigure(1, weight=1) main_frame.grid_rowconfigure(2, weight=1) def start_draw(self, event): """开始绘制""" self.drawing = True self.last_x, self.last_y = event.x, event.y def draw(self, event): """绘制""" if not self.drawing: return x, y = event.x, event.y # 在画布上绘制 self.canvas.create_line( self.last_x, self.last_y, x, y, fill="black", width=self.BRUSH_SIZE, capstyle=tk.ROUND, smooth=True ) # 在图像上绘制 self.draw_obj.line( [self.last_x, self.last_y, x, y], fill=0, width=self.BRUSH_SIZE ) self.last_x, self.last_y = x, y def stop_draw(self, event): """停止绘制""" self.drawing = False self.has_drawn = True self.status_var.set("已绘制数字,点击'识别'进行识别") def clear_canvas(self): """清除画布""" self.canvas.delete("all") self.image = Image.new("L", (self.CANVAS_SIZE, self.CANVAS_SIZE), 255) self.draw_obj = ImageDraw.Draw(self.image) # 添加绘制提示 self.canvas.create_text( self.CANVAS_SIZE / 2, self.CANVAS_SIZE / 2, text="绘制数字", fill="gray", font=("Arial", 16) ) self.result_label.config(text="请绘制数字") self.prob_label.config(text="") self.clear_confidence_display() self.has_drawn = False self.status_var.set("画布已清除") def clear_confidence_display(self): """清除置信度显示""" self.confidence_canvas.delete("all") self.confidence_canvas.create_text( 150, 25, text="识别后显示置信度", fill="gray", font=("Arial", 10) ) for item in self.candidates_tree.get_children(): self.candidates_tree.delete(item) def preprocess_image(self): """预处理手写数字图像""" img_array = np.array(self.image) # 高斯模糊降噪 img_array = cv2.GaussianBlur(img_array, (5, 5), 0) # 二值化 _, img_array = cv2.threshold(img_array, 127, 255, cv2.THRESH_BINARY_INV) # 轮廓检测 contours, _ = cv2.findContours(img_array, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE) if not contours: self.status_var.set("未检测到有效数字,请重新绘制") return None # 找到最大轮廓 c = max(contours, key=cv2.contourArea) x, y, w, h = cv2.boundingRect(c) # 提取数字区域 digit = img_array[y:y+h, x:x+w] # 填充为正方形 size = max(w, h) padded = np.ones((size, size), dtype=np.uint8) * 255 offset_x = (size - w) // 2 offset_y = (size - h) // 2 padded[offset_y:offset_y+h, offset_x:offset_x+w] = digit # 缩放为8x8 resized = cv2.resize(padded, (8, 8), interpolation=cv2.INTER_AREA) # 归一化 normalized = 16 - (resized / 255 * 16).astype(np.uint8) return normalized.flatten() def recognize(self): """识别手写数字""" if not self.has_drawn: self.status_var.set("请先绘制数字再识别") return if self.current_model is None: self.status_var.set("模型未加载,请选择模型") return # 预处理图像 img_array = self.preprocess_image() if img_array is None: return img_input = img_array.reshape(1, -1) try: # 标准化 if self.scaler: img_input = self.scaler.transform(img_input) # LightGBM特殊处理 if self.current_model_type == 'lgb' and hasattr(self.current_model, 'feature_name_'): img_input = pd.DataFrame(img_input, columns=self.current_model.feature_name_) # 预测 pred = self.current_model.predict(img_input)[0] self.result_label.config(text=f"识别结果: {pred}") # 概率预测 if hasattr(self.current_model, 'predict_proba'): probs = self.current_model.predict_proba(img_input)[0] confidence = probs[pred] # 更新UI self.prob_label.config(text=f"置信度: {confidence:.2%}") self.update_confidence_display(confidence) # 显示候选数字 top3 = sorted(enumerate(probs), key=lambda x: -x[1])[:3] self.update_candidates_display(top3) else: self.prob_label.config(text="该模型不支持概率输出") self.clear_confidence_display() self.status_var.set(f"识别完成: 数字 {pred}") except Exception as e: self.status_var.set(f"识别错误: {str(e)}") self.clear_confidence_display() def update_confidence_display(self, confidence): """更新置信度可视化""" self.confidence_canvas.delete("all") # 画布尺寸 canvas_width = self.confidence_canvas.winfo_width() or 300 # 绘制背景 self.confidence_canvas.create_rectangle( 10, 10, canvas_width - 10, 40, fill="#f0f0f0", outline="#cccccc" ) # 绘制置信度条 bar_width = int((canvas_width - 20) * confidence) color = self.get_confidence_color(confidence) self.confidence_canvas.create_rectangle( 10, 10, 10 + bar_width, 40, fill=color, outline="" ) # 绘制文本 self.confidence_canvas.create_text( canvas_width / 2, 25, text=f"{confidence:.1%}", font=("Arial", 10, "bold") ) # 绘制刻度 for i in range(0, 11): x_pos = 10 + i * (canvas_width - 20) / 10 self.confidence_canvas.create_line(x_pos, 40, x_pos, 45, width=1) if i % 2 == 0: self.confidence_canvas.create_text(x_pos, 55, text=f"{i*10}%", font=("Arial", 8)) def get_confidence_color(self, confidence): """根据置信度获取颜色""" if confidence >= 0.9: return "#4CAF50" # 绿色 elif confidence >= 0.7: return "#FFC107" # 黄色 else: return "#F44336" # 红色 def update_candidates_display(self, candidates): """更新候选数字显示""" # 清空现有项 for item in self.candidates_tree.get_children(): self.candidates_tree.delete(item) # 添加新项 for digit, prob in candidates: self.candidates_tree.insert( "", tk.END, values=(digit, f"{prob:.2%}") ) def show_samples(self): """显示样本图像""" plt.figure(figsize=(10, 4)) for i in range(10): plt.subplot(2, 5, i+1) sample_idx = np.where(self.digits.target == i)[0][0] plt.imshow(self.digits.images[sample_idx], cmap="gray") plt.title(f"数字 {i}", fontsize=9) plt.axis("off") plt.tight_layout() plt.show() def on_model_select(self, event): """模型选择事件处理""" selected_name = self.model_var.get() model_type = next( (k for k, v in self.available_models if v == selected_name), None ) if model_type: self.change_model(model_type) def change_model(self, model_type): """切换模型""" model_name = MODEL_METADATA[model_type][0] # 从缓存加载 if model_type in self.model_cache: self.current_model, self.scaler, accuracy, self.current_model_type = self.model_cache[model_type] self.model_label.config(text=f"{model_name} (准确率:{accuracy:.4f})") self.status_var.set(f"已加载模型: {model_name}") return self.status_var.set(f"正在加载模型: {model_name}...") self.root.update() # 更新UI显示状态 try: X_train, X_test, y_train, y_test = self.model_factory.get_split_data(self.digits) self.current_model, self.scaler, accuracy = self.model_factory.train_and_evaluate( model_type, X_train, y_train, X_test, y_test ) self.current_model_type = model_type self.model_cache[model_type] = (self.current_model, self.scaler, accuracy, self.current_model_type) self.model_label.config(text=f"{model_name} (准确率:{accuracy:.4f})") self.status_var.set(f"模型加载完成: {model_name}, 准确率: {accuracy:.4f}") self.clear_canvas() # 更新性能表格 self.load_performance_data() except Exception as e: self.status_var.set(f"模型加载失败: {str(e)}") self.model_label.config(text="模型加载失败") def init_default_model(self): """初始化默认模型""" self.model_var.set(self.available_models[0][1]) self.change_model(self.available_models[0][0]) def load_performance_data(self): """加载性能数据""" results = self.model_factory.evaluate_all_models(self.digits) # 清空表格 for item in self.performance_tree.get_children(): self.performance_tree.delete(item) # 添加数据 for i, result in enumerate(results): tag = "highlight" if i == 0 else "" self.performance_tree.insert( "", tk.END, values=(result["模型名称"], result["准确率"]), tags=(tag,) ) self.performance_tree.tag_configure("highlight", background="#e6f7ff") def show_performance_chart(self): """显示性能图表""" results = self.model_factory.evaluate_all_models(self.digits) # 提取有效结果 valid_results = [] for result in results: try: accuracy = float(result["准确率"]) valid_results.append((result["模型名称"], accuracy)) except ValueError: continue if not valid_results: messagebox.showinfo("提示", "没有可用的性能数据") return # 排序 valid_results.sort(key=lambda x: x[1], reverse=True) models, accuracies = zip(*valid_results) # 创建图表 plt.figure(figsize=(10, 5)) bars = plt.barh(models, accuracies, color='#2196F3') plt.xlabel('准确率', fontsize=10) plt.ylabel('模型', fontsize=10) plt.title('模型性能对比', fontsize=12) plt.xlim(0, 1.05) # 添加数值标签 for bar in bars: width = bar.get_width() plt.text( width + 0.01, bar.get_y() + bar.get_height()/2, f'{width:.4f}', ha='left', va='center', fontsize=8 ) plt.tight_layout() plt.show() def save_as_training_sample(self): """保存为训练样本""" if not self.has_drawn: self.status_var.set("请先绘制数字再保存") return img_array = self.preprocess_image() if img_array is None: return # 弹出标签输入窗口 label_window = tk.Toplevel(self.root) label_window.title("输入标签") label_window.geometry("300x150") label_window.transient(self.root) label_window.grab_set() tk.Label( label_window, text="请输入数字标签 (0-9):", font=("Arial", 10) ).pack(pady=10) entry = tk.Entry(label_window, font=("Arial", 12), width=5) entry.pack(pady=5) entry.focus_set() def save_with_label(): try: label = int(entry.get()) if label < 0 or label > 9: raise ValueError("标签必须是0-9的数字") self.custom_data.append((img_array.tolist(), label)) self.status_var.set(f"已保存数字 {label} (共 {len(self.custom_data)} 个样本)") label_window.destroy() except ValueError as e: self.status_var.set(f"保存错误: {str(e)}") tk.Button( label_window, text="保存", command=save_with_label, width=10 ).pack(pady=5) def save_all_training_data(self): """保存全部训练数据""" if not self.custom_data: self.status_var.set("没有训练数据可保存") return file_path = filedialog.asksaveasfilename( defaultextension=".csv", filetypes=[("CSV文件", "*.csv")], initialfile="custom_digits.csv", title="保存训练集" ) if not file_path: return try: with open(file_path, 'w', newline='', encoding='utf-8') as f: writer = csv.writer(f) writer.writerow([f'pixel{i}' for i in range(64)] + ['label']) for img_data, label in self.custom_data: writer.writerow(img_data + [label]) self.status_var.set(f"已保存 {len(self.custom_data)} 个样本到 {os.path.basename(file_path)}") except Exception as e: self.status_var.set(f"保存失败: {str(e)}") def load_training_data(self): """加载训练数据""" file_path = filedialog.askopenfilename( filetypes=[("CSV文件", "*.csv")], title="加载训练集" ) if not file_path: return try: self.custom_data = [] with open(file_path, 'r', newline='', encoding='utf-8') as f: reader = csv.reader(f) next(reader) # 跳过标题 for row in reader: if len(row) != 65: continue img_data = [float(pixel) for pixel in row[:64]] label = int(row[64]) self.custom_data.append((img_data, label)) self.status_var.set(f"已加载 {len(self.custom_data)} 个样本") except Exception as e: self.status_var.set(f"加载失败: {str(e)}") def run(self): """运行应用""" self.root.mainloop() if __name__ == "__main__": digits = load_digits() root = tk.Tk() app = HandwritingBoard(root, ModelFactory, digits) app.run() 基于此代码,在其中做好大量注释,同时要明确代码的分区功能,要显示明白,让刚学python的同学要能看懂。

filetype

RuntimeError: CUDA error: device-side assert triggered CUDA kernel errors might be asynchronously reported at some other API call, so the stacktrace below might be incorrect. For debugging consider passing CUDA_LAUNCH_BLOCKING=1 Compile with `TORCH_USE_CUDA_DSA` to enable device-side assertions. !pip install transformers datasets torch rouge-score matplotlib import torch import torch.nn as nn import torch.optim as optim from torch.utils.data import Dataset, DataLoader from transformers import BertTokenizerFast import time import numpy as np from datasets import load_dataset from rouge_score import rouge_scorer import matplotlib.pyplot as plt from IPython.display import clear_output # 设备配置 device = torch.device("cuda" if torch.cuda.is_available() else "cpu") print(f"使用设备: {device}") # 数据预处理(严格过滤无效样本) class SummaryDataset(Dataset): def __init__(self, dataset_split, tokenizer, max_article_len=384, max_summary_len=96, subset_size=0.01): self.tokenizer = tokenizer self.max_article_len = max_article_len self.max_summary_len = max_summary_len self.subset = dataset_split.select(range(int(len(dataset_split) * subset_size))) # 严格过滤无效样本 self.articles = [] self.summaries = [] self.vocab = set(tokenizer.vocab.keys()) for item in self.subset: article = item['article'].strip() summary = item['highlights'].strip() if len(article) > 20 and len(summary) > 10: article_tokens = tokenizer.tokenize(article) summary_tokens = tokenizer.tokenize(summary) if all(t in self.vocab for t in article_tokens) and all(t in self.vocab for t in summary_tokens): self.articles.append(article) self.summaries.append(summary) self.pad_token_id = tokenizer.pad_token_id self.unk_token_id = tokenizer.unk_token_id def __len__(self): return len(self.articles) def __getitem__(self, idx): src = self.tokenizer( self.articles[idx], max_length=self.max_article_len, truncation=True, padding='max_length', return_tensors='pt', add_special_tokens=True ) tgt = self.tokenizer( self.summaries[idx], max_length=self.max_summary_len, truncation=True, padding='max_length', return_tensors='pt', add_special_tokens=True ) tgt_labels = tgt['input_ids'].squeeze() tgt_labels[tgt_labels == self.pad_token_id] = -100 # 忽略填充 tgt_labels[tgt_labels >= len(self.tokenizer.vocab)] = self.unk_token_id # 过滤无效id return { 'input_ids': src['input_ids'].squeeze(), 'attention_mask': src['attention_mask'].squeeze(), 'labels': tgt_labels } # 基础Seq2Seq模型 class BasicEncoder(nn.Module): def __init__(self, vocab_size, emb_dim=128, hidden_dim=256): super().__init__() self.embedding = nn.Embedding(vocab_size, emb_dim, padding_idx=0) self.gru = nn.GRU(emb_dim, hidden_dim, num_layers=2, batch_first=True, bidirectional=True) self.fc_hidden = nn.Linear(hidden_dim * 2, hidden_dim) def forward(self, src): embedded = self.embedding(src) outputs, hidden = self.gru(embedded) # 取第二层双向隐藏状态 forward_hidden = hidden[-2, :, :] # 第二层正向 backward_hidden = hidden[-1, :, :] # 第二层反向 hidden = torch.cat([forward_hidden, backward_hidden], dim=1) # (batch, 2*hidden_dim) hidden = self.fc_hidden(hidden).unsqueeze(0) # (1, batch, hidden_dim) return hidden class BasicDecoder(nn.Module): def __init__(self, vocab_size, emb_dim=128, hidden_dim=256): super().__init__() self.embedding = nn.Embedding(vocab_size, emb_dim, padding_idx=0) self.gru = nn.GRU(emb_dim + hidden_dim, hidden_dim, num_layers=1, batch_first=True) self.fc = nn.Linear(hidden_dim * 2 + emb_dim, vocab_size) def forward(self, input_ids, hidden, context): input_embedded = self.embedding(input_ids.unsqueeze(1)) # (batch, 1, emb_dim) input_combined = torch.cat([input_embedded, context.unsqueeze(1)], dim=2) # (batch, 1, emb_dim+hidden_dim) output, hidden = self.gru(input_combined, hidden) # (batch, 1, hidden_dim) output = output.squeeze(1) # (batch, hidden_dim) combined = torch.cat([output, context, input_embedded.squeeze(1)], dim=1) # (batch, 2*hidden_dim+emb_dim) logits = self.fc(combined) return logits, hidden class BasicSeq2Seq(nn.Module): def __init__(self, vocab_size, emb_dim=128, hidden_dim=256): super().__init__() self.encoder = BasicEncoder(vocab_size, emb_dim, hidden_dim) self.decoder = BasicDecoder(vocab_size, emb_dim, hidden_dim) self.device = device self.sos_token_id = 101 # [CLS] self.eos_token_id = 102 # [SEP] self.unk_token_id = 100 # [UNK] def forward(self, src, tgt): hidden = self.encoder(src) context = hidden.squeeze(0) batch_size, tgt_len = tgt.size() outputs = torch.zeros(batch_size, tgt_len, self.decoder.fc.out_features).to(device) input_ids = tgt[:, 0] for t in range(1, tgt_len): logits, hidden = self.decoder(input_ids, hidden, context) outputs[:, t] = logits input_ids = tgt[:, t] return outputs def generate(self, src, max_length=80): src = src.to(device) hidden = self.encoder(src) context = hidden.squeeze(0) # 修正后的生成初始化 generated = torch.full((src.size(0), 1), self.sos_token_id, device=device) # 注意这里的修正 for _ in range(max_length-1): logits, hidden = self.decoder(generated[:, -1], hidden, context) next_token = torch.argmax(logits, dim=1, keepdim=True) # 防止过早生成标点 if generated.size(1) < 5: punctuation = [',', '.', ';', ':', '!', '?', "'", '"', '`', '~'] punct_ids = [self.tokenizer.convert_tokens_to_ids(p) for p in punctuation] if next_token.item() in punct_ids: # 替换为最常见的实词 next_token = torch.tensor([[self.tokenizer.convert_tokens_to_ids('the')]], device=device) generated = torch.cat([generated, next_token], dim=1) if (next_token == self.eos_token_id).all(): break return generated # 注意力Seq2Seq模型 class Attention(nn.Module): def __init__(self, hidden_dim): super().__init__() self.W = nn.Linear(2 * hidden_dim, hidden_dim) self.v = nn.Linear(hidden_dim, 1, bias=False) def forward(self, hidden, encoder_outputs): src_len = encoder_outputs.size(1) hidden = hidden.unsqueeze(1).repeat(1, src_len, 1) # (batch, src_len, hidden_dim) combined = torch.cat([hidden, encoder_outputs], dim=2) # (batch, src_len, 2*hidden_dim) energy = self.v(torch.tanh(self.W(combined))).squeeze(2) # (batch, src_len) return torch.softmax(energy, dim=1) class AttnEncoder(nn.Module): def __init__(self, vocab_size, emb_dim=128, hidden_dim=256): super().__init__() self.embedding = nn.Embedding(vocab_size, emb_dim, padding_idx=0) self.lstm = nn.LSTM(emb_dim, hidden_dim, num_layers=2, batch_first=True, bidirectional=True, dropout=0.1) self.fc_hidden = nn.Linear(hidden_dim * 2, hidden_dim) # 双向输出拼接 self.fc_cell = nn.Linear(hidden_dim * 2, hidden_dim) def forward(self, src): embedded = self.embedding(src) outputs, (hidden, cell) = self.lstm(embedded) # outputs: (batch, src_len, 2*hidden_dim) # 取第二层双向隐藏状态 hidden = torch.cat([hidden[-2, :, :], hidden[-1, :, :]], dim=1) # (batch, 2*hidden_dim) cell = torch.cat([cell[-2, :, :], cell[-1, :, :]], dim=1) hidden = self.fc_hidden(hidden).unsqueeze(0) # (1, batch, hidden_dim) cell = self.fc_cell(cell).unsqueeze(0) return outputs, (hidden, cell) class AttnDecoder(nn.Module): def __init__(self, vocab_size, emb_dim=128, hidden_dim=256): super().__init__() self.embedding = nn.Embedding(vocab_size, emb_dim, padding_idx=0) self.attention = Attention(hidden_dim) self.lstm = nn.LSTM(emb_dim + 2 * hidden_dim, hidden_dim, num_layers=1, batch_first=True) self.fc = nn.Linear(hidden_dim + emb_dim, vocab_size) def forward(self, input_ids, hidden, cell, encoder_outputs): input_embedded = self.embedding(input_ids.unsqueeze(1)) # (batch, 1, emb_dim) attn_weights = self.attention(hidden.squeeze(0), encoder_outputs) # (batch, src_len) context = torch.bmm(attn_weights.unsqueeze(1), encoder_outputs) # (batch, 1, 2*hidden_dim) lstm_input = torch.cat([input_embedded, context], dim=2) # (batch, 1, emb_dim+2*hidden_dim) output, (hidden, cell) = self.lstm(lstm_input, (hidden, cell)) # output: (batch, 1, hidden_dim) logits = self.fc(torch.cat([output.squeeze(1), input_embedded.squeeze(1)], dim=1)) # (batch, vocab_size) return logits, hidden, cell class AttnSeq2Seq(nn.Module): def __init__(self, vocab_size, emb_dim=128, hidden_dim=256): super().__init__() self.encoder = AttnEncoder(vocab_size, emb_dim, hidden_dim) self.decoder = AttnDecoder(vocab_size, emb_dim, hidden_dim) self.device = device self.sos_token_id = 101 # [CLS] self.eos_token_id = 102 # [SEP] self.unk_token_id = 100 # [UNK] def forward(self, src, tgt): encoder_outputs, (hidden, cell) = self.encoder(src) batch_size, tgt_len = tgt.size() outputs = torch.zeros(batch_size, tgt_len, self.decoder.fc.out_features).to(device) input_ids = tgt[:, 0] for t in range(1, tgt_len): logits, hidden, cell = self.decoder(input_ids, hidden, cell, encoder_outputs) outputs[:, t] = logits input_ids = tgt[:, t] return outputs def generate(self, src, max_length=80): encoder_outputs, (hidden, cell) = self.encoder(src) # 修正后的生成初始化 generated = torch.full((src.size(0), 1), self.sos_token_id, device=device) # 注意这里的修正 for _ in range(max_length-1): logits, hidden, cell = self.decoder(generated[:, -1], hidden, cell, encoder_outputs) next_token = torch.argmax(logits, dim=1, keepdim=True) # 防止过早生成标点 if generated.size(1) < 5: punctuation = [',', '.', ';', ':', '!', '?', "'", '"', '`', '~'] punct_ids = [self.tokenizer.convert_tokens_to_ids(p) for p in punctuation] if next_token.item() in punct_ids: # 替换为最常见的实词 next_token = torch.tensor([[self.tokenizer.convert_tokens_to_ids('the')]], device=device) generated = torch.cat([generated, next_token], dim=1) if (next_token == self.eos_token_id).all(): break return generated # Transformer模型 class PositionalEncoding(nn.Module): def __init__(self, d_model, max_len=5000): super().__init__() pe = torch.zeros(max_len, d_model) position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1) div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-np.log(10000.0) / d_model)) pe[:, 0::2] = torch.sin(position * div_term) pe[:, 1::2] = torch.cos(position * div_term) self.register_buffer('pe', pe.unsqueeze(0)) def forward(self, x): return x + self.pe[:, :x.size(1)] class TransformerModel(nn.Module): def __init__(self, vocab_size, d_model=128, nhead=8, num_layers=3, dim_feedforward=512, max_len=5000): super().__init__() self.embedding = nn.Embedding(vocab_size, d_model, padding_idx=0) self.pos_encoder = PositionalEncoding(d_model, max_len) # 编码器 encoder_layer = nn.TransformerEncoderLayer(d_model, nhead, dim_feedforward, dropout=0.1) self.transformer_encoder = nn.TransformerEncoder(encoder_layer, num_layers) # 解码器 decoder_layer = nn.TransformerDecoderLayer(d_model, nhead, dim_feedforward, dropout=0.1) self.transformer_decoder = nn.TransformerDecoder(decoder_layer, num_layers) self.fc = nn.Linear(d_model, vocab_size) self.d_model = d_model self.sos_token_id = 101 # [CLS] self.eos_token_id = 102 # [SEP] def _generate_square_subsequent_mask(self, sz): mask = (torch.triu(torch.ones(sz, sz)) == 1).transpose(0, 1) mask = mask.float().masked_fill(mask == 0, float('-inf')).masked_fill(mask == 1, float(0.0)) return mask def forward(self, src, tgt): src_mask = None tgt_mask = self._generate_square_subsequent_mask(tgt.size(1)).to(device) src_key_padding_mask = (src == 0) tgt_key_padding_mask = (tgt == 0) src = self.embedding(src) * np.sqrt(self.d_model) src = self.pos_encoder(src) tgt = self.embedding(tgt) * np.sqrt(self.d_model) tgt = self.pos_encoder(tgt) memory = self.transformer_encoder(src.transpose(0, 1), src_mask, src_key_padding_mask) output = self.transformer_decoder( tgt.transpose(0, 1), memory, tgt_mask, None, tgt_key_padding_mask, src_key_padding_mask ) output = self.fc(output.transpose(0, 1)) return output def generate(self, src, max_length=80): src_mask = None src_key_padding_mask = (src == 0) src = self.embedding(src) * np.sqrt(self.d_model) src = self.pos_encoder(src) memory = self.transformer_encoder(src.transpose(0, 1), src_mask, src_key_padding_mask) batch_size = src.size(0) generated = torch.full((batch_size, 1), self.sos_token_id, device=device) for i in range(max_length-1): tgt_mask = self._generate_square_subsequent_mask(generated.size(1)).to(device) tgt_key_padding_mask = (generated == 0) tgt = self.embedding(generated) * np.sqrt(self.d_model) tgt = self.pos_encoder(tgt) output = self.transformer_decoder( tgt.transpose(0, 1), memory, tgt_mask, None, tgt_key_padding_mask, src_key_padding_mask ) output = self.fc(output.transpose(0, 1)[:, -1, :]) next_token = torch.argmax(output, dim=1, keepdim=True) generated = torch.cat([generated, next_token], dim=1) if (next_token == self.eos_token_id).all(): break return generated # 训练函数 def train_model(model, train_loader, optimizer, criterion, epochs=3): model.train() optimizer = optim.Adam(model.parameters(), lr=1e-4) scheduler = optim.lr_scheduler.ReduceLROnPlateau(optimizer, 'min', patience=1, factor=0.5) start_time = time.time() for epoch in range(epochs): total_loss = 0 model.train() for i, batch in enumerate(train_loader): src = batch['input_ids'].to(device) tgt = batch['labels'].to(device) optimizer.zero_grad() outputs = model(src, tgt[:, :-1]) # 检查模型输出有效性 if torch.isnan(outputs).any(): print("警告:模型输出包含NaN,跳过此批次") continue loss = criterion(outputs.reshape(-1, outputs.size(-1)), tgt[:, 1:].reshape(-1)) loss.backward() torch.nn.utils.clip_grad_norm_(model.parameters(), 0.5) # 梯度裁剪 optimizer.step() total_loss += loss.item() if (i+1) % 10 == 0: print(f"Epoch {epoch+1}/{epochs} | Batch {i+1}/{len(train_loader)} | Loss: {loss.item():.4f}") avg_loss = total_loss / len(train_loader) scheduler.step(avg_loss) print(f"Epoch {epoch+1} | 平均损失: {avg_loss:.4f}") torch.cuda.empty_cache() total_time = time.time() - start_time print(f"训练完成!总耗时: {total_time:.2f}s ({total_time/60:.2f}分钟)") return model, total_time # 评估函数 def evaluate_model(model, val_loader, tokenizer, num_examples=2): model.eval() scorer = rouge_scorer.RougeScorer(['rouge1', 'rouge2', 'rougeL'], use_stemmer=True) rouge_scores = {'rouge1': [], 'rouge2': [], 'rougeL': []} valid_count = 0 with torch.no_grad(): for i, batch in enumerate(val_loader): src = batch['input_ids'].to(device) tgt = batch['labels'].to(device) generated = model.generate(src) for s, p, t in zip(src, generated, tgt): src_txt = tokenizer.decode(s, skip_special_tokens=True) pred_txt = tokenizer.decode(p, skip_special_tokens=True) true_txt = tokenizer.decode(t[t != -100], skip_special_tokens=True) if len(pred_txt.split()) > 3 and len(true_txt.split()) > 3: valid_count += 1 if valid_count <= num_examples: print(f"\n原文: {src_txt[:100]}...") print(f"生成: {pred_txt}") print(f"参考: {true_txt[:80]}...") print("-"*60) if true_txt and pred_txt: scores = scorer.score(true_txt, pred_txt) for key in rouge_scores: rouge_scores[key].append(scores[key].fmeasure) if valid_count > 0: avg_scores = {key: sum(rouge_scores[key])/len(rouge_scores[key]) for key in rouge_scores} print(f"\n评估结果 (基于{valid_count}个样本):") print(f"ROUGE-1: {avg_scores['rouge1']*100:.2f}%") print(f"ROUGE-2: {avg_scores['rouge2']*100:.2f}%") print(f"ROUGE-L: {avg_scores['rougeL']*100:.2f}%") else: print("警告:未生成有效摘要") avg_scores = {key: 0.0 for key in rouge_scores} return avg_scores # 可视化模型性能 def visualize_model_performance(model_names, train_times, rouge_scores): plt.figure(figsize=(15, 6)) # 训练时间对比图 plt.subplot(1, 2, 1) bars = plt.bar(model_names, train_times) plt.title('模型训练时间对比') plt.ylabel('时间 (分钟)') for bar in bars: height = bar.get_height() plt.text(bar.get_x() + bar.get_width()/2., height, f'{height:.1f} min', ha='center', va='bottom') # ROUGE分数对比图 plt.subplot(1, 2, 2) x = np.arange(len(model_names)) width = 0.25 plt.bar(x - width, [scores['rouge1'] for scores in rouge_scores], width, label='ROUGE-1') plt.bar(x, [scores['rouge2'] for scores in rouge_scores], width, label='ROUGE-2') plt.bar(x + width, [scores['rougeL'] for scores in rouge_scores], width, label='ROUGE-L') plt.title('模型ROUGE分数对比') plt.ylabel('F1分数') plt.xticks(x, model_names) plt.legend() plt.tight_layout() plt.savefig('performance_comparison.png') plt.show() print("性能对比图已保存为 performance_comparison.png") # 交互式文本摘要生成 def interactive_summarization(models, tokenizer, model_names, max_length=80): while True: print("\n" + "="*60) print("文本摘要交互式测试 (输入 'q' 退出)") print("="*60) input_text = input("请输入要摘要的文本:\n") if input_text.lower() == 'q': break if len(input_text) < 50: print("请输入更长的文本(至少50个字符)") continue # 生成摘要 inputs = tokenizer( input_text, max_length=384, truncation=True, padding='max_length', return_tensors='pt' ).to(device) print("\n生成摘要中...") all_summaries = [] for i, model in enumerate(models): model.eval() with torch.no_grad(): generated = model.generate(inputs["input_ids"]) summary = tokenizer.decode(generated[0], skip_special_tokens=True) all_summaries.append(summary) # 打印结果 print(f"\n{model_names[i]} 摘要:") print("-"*50) print(summary) print("-"*50) print("\n所有模型摘要对比:") for i, (name, summary) in enumerate(zip(model_names, all_summaries)): print(f"{i+1}. {name}: {summary}") # 主程序 print("加载数据集...") dataset = load_dataset("cnn_dailymail", "3.0.0") tokenizer = BertTokenizerFast.from_pretrained('bert-base-uncased') vocab_size = len(tokenizer.vocab) # 准备训练数据 print("准备训练数据...") train_ds = SummaryDataset(dataset['train'], tokenizer, subset_size=0.01) # 使用1%的数据 val_ds = SummaryDataset(dataset['validation'], tokenizer, subset_size=0.01) train_loader = DataLoader(train_ds, batch_size=4, shuffle=True, num_workers=0) val_loader = DataLoader(val_ds, batch_size=8, shuffle=False, num_workers=0) # 定义损失函数 criterion = nn.CrossEntropyLoss(ignore_index=-100) # 训练基础Seq2Seq print("\n" + "="*60) print("训练基础Seq2Seq模型") print("="*60) basic_model = BasicSeq2Seq(vocab_size).to(device) trained_basic, basic_time = train_model(basic_model, train_loader, None, criterion, epochs=3) basic_rouge = evaluate_model(trained_basic, val_loader, tokenizer) # 训练注意力Seq2Seq print("\n" + "="*60) print("训练注意力Seq2Seq模型") print("="*60) attn_model = AttnSeq2Seq(vocab_size).to(device) trained_attn, attn_time = train_model(attn_model, train_loader, None, criterion, epochs=3) attn_rouge = evaluate_model(trained_attn, val_loader, tokenizer) # 训练Transformer print("\n" + "="*60) print("训练Transformer模型") print("="*60) transformer_model = TransformerModel(vocab_size).to(device) trained_transformer, transformer_time = train_model(transformer_model, train_loader, None, criterion, epochs=3) transformer_rouge = evaluate_model(trained_transformer, val_loader, tokenizer) # 可视化模型性能 print("\n" + "="*60) print("模型性能对比") print("="*60) model_names = ['基础Seq2Seq', '注意力Seq2Seq', 'Transformer'] train_times = [basic_time/60, attn_time/60, transformer_time/60] rouge_scores = [basic_rouge, attn_rouge, transformer_rouge] visualize_model_performance(model_names, train_times, rouge_scores) # 交互式测试 print("\n" + "="*60) print("交互式文本摘要测试") print("="*60) print("提示:输入一段文本,将同时生成三个模型的摘要结果") interactive_summarization( [trained_basic, trained_attn, trained_transformer], tokenizer, model_names ) 修改完错误后发完整代码给我

陶涵煦
  • 粉丝: 42
上传资源 快速赚钱