file-type

2020美赛周末赛题及数据集解析

ZIP文件

下载需积分: 50 | 2.28MB | 更新于2025-03-26 | 149 浏览量 | 2 下载量 举报 收藏
download 立即下载
根据给定文件信息,可以解析出以下知识点: 标题知识点: 1. 文件命名规范与识别:标题中的“2020_Weekend1_Problems.zip”表示这是一个关于2020年某个周末(初步推断为数学建模竞赛的第一个周末)相关问题的压缩包文件。文件命名采用了“年份_活动类型_时间周期_主题”的结构化命名方式,这种命名方式便于在大量文件中快速识别内容。 描述知识点: 2. 数学建模竞赛介绍:描述中提到的“2020年美赛”,指的是美国大学生数学建模竞赛(Mathematical Contest in Modeling,简称MCM)。这是全球大学生规模最大的数学建模竞赛之一,每年举办,分春季和秋季两轮,通常被称为A/B题和C题轮次。 3. 数学建模题目类型:描述中提到的A、D、E三个题目分别代表了数学建模竞赛中的不同类型的题目。数学建模竞赛通常包含理论建模、数据分析、优化问题等,A、D、E题目可能分别对应不同领域的具体问题。 4. 文件内容:描述明确指出压缩包内含有三个题目的PDF文件,以及D题的数据表。PDF文件通常用于发布文档,便于阅读和打印。数据表则暗示D题涉及数据驱动的问题,需要参赛者通过分析数据表中的信息来解决问题。 标签知识点: 5. 算法重要性:标签“算法”表明在数学建模过程中,算法的应用非常重要。算法是解决问题、进行数据分析和模型建立的基础工具,尤其在处理数据集、优化模型等方面扮演核心角色。它要求参赛者具备编程能力,比如使用Python、R、MATLAB等语言进行算法实现。 文件名称列表知识点: 6. 压缩文件格式理解:文件名称列表中的“2020_Weekend1_Problems.zip”直接显示了文件是经过ZIP格式压缩的。ZIP是一种流行的文件压缩格式,可以将多个文件压缩为一个文件包,便于传输和存储。压缩包在数学建模竞赛中非常常见,因为参赛材料往往包含多个文件,压缩后有利于下载和解压。 7. 文件内容组织:由于文件名列表只列出一个文件,可能说明该压缩包内有多个子文件夹或文件结构,每个子文件夹或文件对应不同的题目。例如,可能有一个文件夹专门存储A题的PDF和相关数据,其他文件夹同理。在处理此类压缩文件时,需要了解如何合理地组织和管理文件,确保在解压后能高效地找到所需资料。 综上所述,文件提供了数学建模竞赛相关的重要信息,涵盖了数学建模的基本概念、文件格式和命名规范、以及数据分析等知识点。对于准备参与数学建模竞赛的学生来说,这些知识点非常重要,可以帮助他们更好地组织材料、理解问题,并运用算法解决问题。

相关推荐

filetype

你之前让我生成的csv是这样的:file_path,videoid,category,name,valid,reason downloaded_videos\video_1.mp4,1013111405,A,"Traditional japanese dance at an annual feast at hie shrine in tokyo - tokyo / japan - june 15, 2018",1,OK downloaded_videos\video_2.mp4,1031975432,A,Professional break dancer dancing on a stage with spotlight in the background,1,OK downloaded_videos\video_3.mp4,11453228,A,"Holi festival of colours is an ancient hindu religious festival. holi celebrations start with a holika bonfire on the night before holi where people gather, sing and dance. march 2014, agra, india",1,OK downloaded_videos\video_4.mp4,530059,A,Beautiful young blond woman dances in tight top,1,OK downloaded_videos\video_5.mp4,19008988,A,Looped laser lights dance background,1,OK downloaded_videos\video_6.mp4,1032769946,A,Happy asian family dancing in kitchen giving children high five having fun dance with mother and father enjoying exciting weekend at home celebration 4k footage,1,OK downloaded_videos\video_7.mp4,1056793340,A,Woman in black velvet dress and high heeled shoes dances with man classical argentinian tango in light studio close view,1,OK downloaded_videos\video_8.mp4,10788614,A,Hawaii dancer standing behind palm leaves and waving a greeting,1,OK downloaded_videos\video_9.mp4,1014676781,A,Girl dances in the club close up,1,OK downloaded_videos\video_10.mp4,17602381,A,New york city 1966: jazz dance moves 7 closeup behind kick stepping shuffle slide.,1,OK downloaded_videos\video_11.mp4,1041222760,A,"Tra vinh, viet nam - november 09, 2019 : khmer snake yeak dance and chhay-dam (sadam) drum dance in offering of the kathina cloth in ok om bok festival .",1,OK downloaded_videos\video_12.mp4,1047622846,A,Couple performing an emotional contemporary dance,1,OK downloaded_videos\video_13.mp4,1023542938,A,"Young attractive girl having fun sitting on the bed with laptop. she sings, dances, rejoices at good news",1,OK downloaded_videos\video_14.mp4,1032496763,A,"Taos pueblo, new mexico / usa - april 11, 2019: close up, authentic native american dancer, slow motion",1,OK downloaded_videos\video_15.mp4,1034700305,A,"Austin , tx / united states - 04 24 2019: club dance floor",1,OK downloaded_videos\video_16.mp4,14390497,A,"Groningen, n

filetype

import pandas as pd import numpy as np import matplotlib.pyplot as plt import seaborn as sns import warnings warnings.filterwarnings("ignore") file_path='train.csv' # 读取数据 df = pd.read_csv(file_path, encoding='gbk') print(df) # 数据清洗 print("开始对文件数据进行清洗:") print("1.数据清洗--删除重复值") df_unique_all= df.drop_duplicates(keep=False) print("将df_unique_all写入train.csv中") df_unique_all.to_csv("train1.csv", index=False) print("将df_unique_all写入train1.csv中") df_unique_all.to_csv("train1.csv", index=False) # # 每日单车骑行数量柱状图 # df['start_time']=pd.to_datetime(df['start_time']) # 将data中的数据转化为日期形式 # df['end_time']=pd.to_datetime(df['end_time']) # df['start_year'] = df['start_time'].dt.year # df['start_month'] = df['start_time'].dt.month # df['start_day'] = df['start_time'].dt.day # df['start_weekday']=df['start_time'].dt.weekday # df['start_hour'] = df['start_time'].dt.hour # df.head() # # # 引用字典对start_weekday数据编码 # dict1={0:'星期一',1:'星期二',2:'星期三',3:'星期四',4:'星期五',5:'星期六',6:'星期日'} # df['start_weekday']=df['start_weekday'].map(dict1) # df.head() # # 0 or 1 weekend # dict1={'星期一':0,'星期二':0,'星期三':0,'星期四':0,'星期五':0,'星期六':1,'星期日':1} # df['weekend']=df['start_weekday'].map(dict1) # df.head() # # df=df.sort_values(by="start_time") # df.index=[i for i in range(len(df))] # df.head() # print(df.iloc[0,:].start_time) #通过iloc属性获取数据 # print(df.iloc[-1,:].start_time) # df[['start_day','start_weekday']].drop_duplicates().head(7) # plt.figure(figsize=(18,8)) # sns.countplot(x="start_day",data=df,palette='viridis') # sns.set_style("whitegrid") # 设置风格为白色网格 # plt.title('每日单车使用次数',fontsize=40,fontweight='bold',pad=20) # plt.xlabel('Day',fontsize=30) # plt.ylabel('Count',fontsize=30) # plt.rcParams['font.sans-serif']=['Microsoft YaHei'] #显示中文标签 # plt.rcParams['axes.unicode_minus']=False # plt.tick_params(labelsize=12) # plt.tight_layout() # plt.show() # 骑行距离分布 print(df['起始点距离'].unique()) 续写我的代码要求生成一个可以提取文件中“起始点距离”列的数据并由此生成一个直方图的代码,要求用到hist()函数

filetype

import tkinter as tk from tkinter import ttk, filedialog, messagebox import pandas as pd import numpy as np import matplotlib as mpl import matplotlib.pyplot as plt from matplotlib.backends.backend_tkagg import FigureCanvasTkAgg import tensorflow as tf from tensorflow.keras.models import Model from tensorflow.keras.layers import Input, Dense, Lambda from tensorflow.keras.optimizers import Adam from sklearn.preprocessing import MinMaxScaler import os import time import warnings import matplotlib.dates as mdates warnings.filterwarnings('ignore', category=UserWarning, module='tensorflow') mpl.rcParams['font.sans-serif'] = ['SimHei', 'Microsoft YaHei', 'Arial Unicode MS'] mpl.rcParams['axes.unicode_minus'] = False # 关键修复:使用 ASCII 减号 # 设置中文字体支持 plt.rcParams['font.sans-serif'] = ['SimHei'] plt.rcParams['axes.unicode_minus'] = False class PINNModel(tf.keras.Model): def __init__(self, num_layers=4, hidden_units=32, **kwargs): super(PINNModel, self).__init__(**kwargs) self.dense_layers = [Dense(hidden_units, activation='tanh') for _ in range(num_layers)] self.final_layer = Dense(1, activation='linear') # 添加更多带约束的物理参数 # 基本衰减系数 self.k1_raw = tf.Variable(0.1, trainable=True, dtype=tf.float32, name='k1_raw') self.k1 = tf.math.sigmoid(self.k1_raw) * 0.5 # 约束在0-0.5之间 # 水位依赖的衰减系数 self.k2_raw = tf.Variable(0.01, trainable=True, dtype=tf.float32, name='k2_raw') self.k2 = tf.math.sigmoid(self.k2_raw) * 0.1 # 约束在0-0.1之间 # 非线性项系数 self.alpha_raw = tf.Variable(0.1, trainable=True, dtype=tf.float32, name='alpha_raw') self.alpha = tf.math.sigmoid(self.alpha_raw) * 1.0 # 约束在0-1.0之间 # 外部影响系数(如降雨、温度等) self.beta_raw = tf.Variable(0.05, trainable=True, dtype=tf.float32, name='beta_raw') self.beta = tf.math.sigmoid(self.beta_raw) * 0.2 # 约束在0-0.2之间 def call(self, inputs): t, h, dt = inputs # 添加更多特征交互项 interaction = tf.concat([ t * h, h * dt, t * dt, (t * h * dt) ], axis=1) # 将时间、水位和时间步长作为输入特征 x = tf.concat([t, h, dt, interaction], axis=1) for layer in self.dense_layers: x = layer(x) return self.final_layer(x) def physics_loss(self, t, h_current, dt): """计算物理损失(改进的离散渗流方程)""" # 预测下一时刻的水位 h_next_pred = self([t, h_current, dt]) # 改进的物理方程:非线性衰减模型 + 外部影响项 # 添加数值保护 exponent = - (self.k1 + self.k2 * h_current) * dt exponent = tf.clip_by_value(exponent, -50.0, 50.0) # 防止指数爆炸 decay_term = h_current * tf.exp(exponent) # 同样保护第二个指数项 beta_exp = -self.beta * dt beta_exp = tf.clip_by_value(beta_exp, -50.0, 50.0) external_term = self.alpha * (1 - tf.exp(beta_exp)) residual = h_next_pred - (decay_term + external_term) return tf.reduce_mean(tf.square(residual)) class DamSeepageModel: def __init__(self, root): self.root = root self.root.title("大坝渗流预测模型(PINNs)") self.root.geometry("1200x800") # 初始化数据 self.train_df = None # 训练集 self.test_df = None # 测试集 self.model = None self.scaler_t = MinMaxScaler(feature_range=(0, 1)) self.scaler_h = MinMaxScaler(feature_range=(0, 1)) self.scaler_dt = MinMaxScaler(feature_range=(0, 1)) self.evaluation_metrics = {} # 创建主界面 self.create_widgets() def create_widgets(self): # 创建主框架 main_frame = ttk.Frame(self.root, padding=10) main_frame.pack(fill=tk.BOTH, expand=True) # 左侧控制面板 control_frame = ttk.LabelFrame(main_frame, text="模型控制", padding=10) control_frame.pack(side=tk.LEFT, fill=tk.Y, padx=5, pady=5) # 文件选择部分 file_frame = ttk.LabelFrame(control_frame, text="数据文件", padding=10) file_frame.pack(fill=tk.X, pady=5) # 训练集选择 ttk.Label(file_frame, text="训练集:").grid(row=0, column=0, sticky=tk.W, pady=5) self.train_file_var = tk.StringVar() ttk.Entry(file_frame, textvariable=self.train_file_var, width=30, state='readonly').grid( row=0, column=1, padx=5) ttk.Button(file_frame, text="选择文件", command=lambda: self.select_file("train")).grid(row=0, column=2) # 测试集选择 ttk.Label(file_frame, text="测试集:").grid(row=1, column=0, sticky=tk.W, pady=5) self.test_file_var = tk.StringVar() ttk.Entry(file_frame, textvariable=self.test_file_var, width=30, state='readonly').grid(row=1, column=1, padx=5) ttk.Button(file_frame, text="选择文件", command=lambda: self.select_file("test")).grid(row=1, column=2) # PINNs参数设置 param_frame = ttk.LabelFrame(control_frame, text="PINNs参数", padding=10) param_frame.pack(fill=tk.X, pady=10) # 验证集切分比例 ttk.Label(param_frame, text="验证集比例:").grid(row=0, column=0, sticky=tk.W, pady=5) self.split_ratio_var = tk.DoubleVar(value=0.2) ttk.Spinbox(param_frame, from_=0, to=1, increment=0.05, textvariable=self.split_ratio_var, width=10).grid(row=0, column=1, padx=5) # 隐藏层数量 ttk.Label(param_frame, text="网络层数:").grid(row=1, column=0, sticky=tk.W, pady=5) self.num_layers_var = tk.IntVar(value=4) ttk.Spinbox(param_frame, from_=2, to=8, increment=1, textvariable=self.num_layers_var, width=10).grid(row=1, column=1, padx=5) # 每层神经元数量 ttk.Label(param_frame, text="神经元数/层:").grid(row=2, column=0, sticky=tk.W, pady=5) self.hidden_units_var = tk.IntVar(value=32) ttk.Spinbox(param_frame, from_=16, to=128, increment=4, textvariable=self.hidden_units_var, width=10).grid(row=2, column=1, padx=5) # 训练轮次 ttk.Label(param_frame, text="训练轮次:").grid(row=3, column=0, sticky=tk.W, pady=5) self.epochs_var = tk.IntVar(value=500) ttk.Spinbox(param_frame, from_=100, to=2000, increment=100, textvariable=self.epochs_var, width=10).grid(row=3, column=1, padx=5) # 物理损失权重 ttk.Label(param_frame, text="物理损失权重:").grid(row=4, column=0, sticky=tk.W, pady=5) self.physics_weight_var = tk.DoubleVar(value=0.5) ttk.Spinbox(param_frame, from_=0.1, to=1.0, increment=0.1, textvariable=self.physics_weight_var, width=10).grid(row=4, column=1, padx=5) # 控制按钮 btn_frame = ttk.Frame(control_frame) btn_frame.pack(fill=tk.X, pady=10) ttk.Button(btn_frame, text="训练模型", command=self.train_model).pack(side=tk.LEFT, padx=5) ttk.Button(btn_frame, text="预测结果", command=self.predict).pack(side=tk.LEFT, padx=5) ttk.Button(btn_frame, text="保存结果", command=self.save_results).pack(side=tk.LEFT, padx=5) ttk.Button(btn_frame, text="重置", command=self.reset).pack(side=tk.RIGHT, padx=5) # 状态栏 self.status_var = tk.StringVar(value="就绪") status_bar = ttk.Label(control_frame, textvariable=self.status_var, relief=tk.SUNKEN, anchor=tk.W) status_bar.pack(fill=tk.X, side=tk.BOTTOM) # 右侧结果显示区域 result_frame = ttk.Frame(main_frame) result_frame.pack(side=tk.RIGHT, fill=tk.BOTH, expand=True, padx=5, pady=5) # 创建标签页 self.notebook = ttk.Notebook(result_frame) self.notebook.pack(fill=tk.BOTH, expand=True) # 损失曲线标签页 self.loss_frame = ttk.Frame(self.notebook) self.notebook.add(self.loss_frame, text="训练损失") # 预测结果标签页 self.prediction_frame = ttk.Frame(self.notebook) self.notebook.add(self.prediction_frame, text="预测结果") # 指标显示 self.metrics_var = tk.StringVar() metrics_label = ttk.Label( self.prediction_frame, textvariable=self.metrics_var, font=('TkDefaultFont', 10, 'bold'), relief='ridge', padding=5 ) metrics_label.pack(fill=tk.X, padx=5, pady=5) # 初始化绘图区域 self.fig, self.ax = plt.subplots(figsize=(10, 6)) self.canvas = FigureCanvasTkAgg(self.fig, master=self.prediction_frame) self.canvas.get_tk_widget().pack(fill=tk.BOTH, expand=True) # 损失曲线画布 self.loss_fig, self.loss_ax = plt.subplots(figsize=(10, 4)) self.loss_canvas = FigureCanvasTkAgg(self.loss_fig, master=self.loss_frame) self.loss_canvas.get_tk_widget().pack(fill=tk.BOTH, expand=True) def select_file(self, file_type): """选择Excel文件并计算时间步长""" try: file_path = filedialog.askopenfilename( title=f"选择{file_type}集Excel文件", filetypes=[("Excel文件", "*.xlsx *.xls"), ("所有文件", "*.*")] ) if not file_path: return df = pd.read_excel(file_path) # 验证必需列是否存在 required_cols = ['year', 'month', 'day', '水位'] missing_cols = [col for col in required_cols if col not in df.columns] if missing_cols: messagebox.showerror("列名错误", f"缺少必需列: {', '.join(missing_cols)}") return # 时间特征处理 time_features = ['year', 'month', 'day'] missing_time_features = [feat for feat in time_features if feat not in df.columns] if missing_time_features: messagebox.showerror("列名错误", f"Excel文件缺少预处理后的时间特征列: {', '.join(missing_time_features)}") return # 创建时间戳列 (增强兼容性) time_cols = ['year', 'month', 'day'] if 'hour' in df.columns: time_cols.append('hour') if 'minute' in df.columns: time_cols.append('minute') if 'second' in df.columns: time_cols.append('second') # 填充缺失的时间单位 for col in ['hour', 'minute', 'second']: if col not in df.columns: df[col] = 0 df['datetime'] = pd.to_datetime(df[time_cols]) # 设置时间索引 df = df.set_index('datetime') # 计算相对时间(天) df['days'] = (df.index - df.index[0]).days # 新增:计算时间步长dt(单位:天) df['dt'] = df.index.to_series().diff().dt.total_seconds() / 86400 # 精确到秒级 # 处理时间步长异常值 if len(df) > 1: # 计算有效时间步长(排除<=0的值) valid_dt = df['dt'][df['dt'] > 0] if len(valid_dt) > 0: avg_dt = valid_dt.mean() else: avg_dt = 1.0 else: avg_dt = 1.0 # 替换非正值 df.loc[df['dt'] <= 0, 'dt'] = avg_dt # 填充缺失值 df['dt'] = df['dt'].fillna(avg_dt) # 保存数据 if file_type == "train": self.train_df = df self.train_file_var.set(os.path.basename(file_path)) self.status_var.set(f"已加载训练集: {len(self.train_df)}条数据") else: self.test_df = df self.test_file_var.set(os.path.basename(file_path)) self.status_var.set(f"已加载测试集: {len(self.test_df)}条数据") except Exception as e: error_msg = f"文件读取失败: {str(e)}\n\n请确保:\n1. 文件不是打开状态\n2. 文件格式正确\n3. 包含必需的时间和水位列" messagebox.showerror("文件错误", error_msg) def calculate_metrics(self, y_true, y_pred): """计算评估指标""" from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score mse = mean_squared_error(y_true, y_pred) rmse = np.sqrt(mse) mae = mean_absolute_error(y_true, y_pred) non_zero_idx = np.where(y_true != 0)[0] if len(non_zero_idx) > 0: mape = np.mean(np.abs((y_true[non_zero_idx] - y_pred[non_zero_idx]) / y_true[non_zero_idx])) * 100 else: mape = float('nan') r2 = r2_score(y_true, y_pred) return { 'MSE': mse, 'RMSE': rmse, 'MAE': mae, 'MAPE': mape, # 修正键名 'R2': r2 } def train_model(self): """训练PINNs模型(带早停机制+训练指标监控)""" if self.train_df is None: messagebox.showwarning("警告", "请先选择训练集文件") return try: self.status_var.set("正在预处理数据...") self.root.update() # 从训练集中切分训练子集和验证子集(时间顺序切分) split_ratio = 1 - self.split_ratio_var.get() split_idx = int(len(self.train_df) * split_ratio) train_subset = self.train_df.iloc[:split_idx] valid_subset = self.train_df.iloc[split_idx:] # 检查数据量是否足够 if len(train_subset) < 2 or len(valid_subset) < 2: messagebox.showerror("数据错误", "训练集数据量不足(至少需要2个时间步)") return # 数据预处理 - 分别归一化不同特征 # 归一化时间特征 t_train = train_subset['days'].values[1:].reshape(-1, 1) self.scaler_t.fit(t_train) t_train_scaled = self.scaler_t.transform(t_train).astype(np.float32) # 归一化水位特征 h_train = train_subset['水位'].values[:-1].reshape(-1, 1) self.scaler_h.fit(h_train) h_train_scaled = self.scaler_h.transform(h_train).astype(np.float32) # 归一化时间步长特征 dt_train = train_subset['dt'].values[1:].reshape(-1, 1) self.scaler_dt.fit(dt_train) dt_train_scaled = self.scaler_dt.transform(dt_train).astype(np.float32) # 归一化标签(下一时刻水位) h_next_train = train_subset['水位'].values[1:].reshape(-1, 1) h_next_train_scaled = self.scaler_h.transform(h_next_train).astype(np.float32) # 准备验证数据(同样进行归一化) t_valid = valid_subset['days'].values[1:].reshape(-1, 1) t_valid_scaled = self.scaler_t.transform(t_valid).astype(np.float32) h_valid = valid_subset['水位'].values[:-1].reshape(-1, 1) h_valid_scaled = self.scaler_h.transform(h_valid).astype(np.float32) dt_valid = valid_subset['dt'].values[1:].reshape(-1, 1) dt_valid_scaled = self.scaler_dt.transform(dt_valid).astype(np.float32) h_next_valid_scaled = self.scaler_h.transform( valid_subset['水位'].values[1:].reshape(-1, 1) ).astype(np.float32) # 原始值用于指标计算 h_next_train_true = h_next_train h_next_valid_true = valid_subset['水位'].values[1:].reshape(-1, 1) # 创建模型和优化器 self.model = PINNModel( num_layers=self.num_layers_var.get(), hidden_units=self.hidden_units_var.get() ) optimizer = Adam(learning_rate=0.001) # 在训练循环中,使用归一化后的数据 train_dataset = tf.data.Dataset.from_tensor_slices( ((t_train_scaled, h_train_scaled, dt_train_scaled), h_next_train_scaled) ) train_dataset = train_dataset.shuffle(buffer_size=1024).batch(32) valid_dataset = tf.data.Dataset.from_tensor_slices( ((t_valid_scaled, h_valid_scaled, dt_valid_scaled), h_next_valid_scaled) ) valid_dataset = valid_dataset.batch(32) # 初始化训练历史记录列表 train_data_loss_history = [] physics_loss_history = [] valid_data_loss_history = [] train_metrics_history = [] valid_metrics_history = [] # 早停机制参数 patience = int(self.epochs_var.get() / 3) min_delta = 1e-4 best_valid_loss = float('inf') wait = 0 best_epoch = 0 best_weights = None start_time = time.time() # 自定义训练循环 for epoch in range(self.epochs_var.get()): # 训练阶段 epoch_train_data_loss = [] epoch_physics_loss = [] # 收集训练预测值(归一化后) train_pred_scaled = [] for step, ((t_batch, h_batch, dt_batch), h_next_batch) in enumerate(train_dataset): with tf.GradientTape() as tape: # 预测下一时刻水位 h_pred = self.model([t_batch, h_batch, dt_batch]) data_loss = tf.reduce_mean(tf.square(h_next_batch - h_pred)) # 动态调整物理损失权重 current_physics_weight = tf.minimum( self.physics_weight_var.get() * (1.0 + epoch / self.epochs_var.get()), 0.8 ) # 计算物理损失(传入时间步长dt) physics_loss = self.model.physics_loss(t_batch, h_batch, dt_batch) loss = data_loss + current_physics_weight * physics_loss grads = tape.gradient(loss, self.model.trainable_variables) optimizer.apply_gradients(zip(grads, self.model.trainable_variables)) epoch_train_data_loss.append(data_loss.numpy()) epoch_physics_loss.append(physics_loss.numpy()) train_pred_scaled.append(h_pred.numpy()) # 保存训练预测值(归一化) # 合并训练预测值(归一化后) train_pred_scaled = np.concatenate(train_pred_scaled, axis=0) # 反归一化得到原始预测值 train_pred_true = self.scaler_h.inverse_transform(train_pred_scaled) # 计算训练集指标(使用原始真实值和预测值) train_metrics = self.calculate_metrics( y_true=h_next_train_true.flatten(), y_pred=train_pred_true.flatten() ) train_metrics_history.append(train_metrics) # 验证阶段 epoch_valid_data_loss = [] valid_pred_scaled = [] for ((t_v_batch, h_v_batch, dt_v_batch), h_v_next_batch) in valid_dataset: h_v_pred = self.model([t_v_batch, h_v_batch, dt_v_batch]) valid_data_loss = tf.reduce_mean(tf.square(h_v_next_batch - h_v_pred)) epoch_valid_data_loss.append(valid_data_loss.numpy()) valid_pred_scaled.append(h_v_pred.numpy()) # 保存验证预测值(归一化) # 合并验证预测值(归一化后) valid_pred_scaled = np.concatenate(valid_pred_scaled, axis=0) # 反归一化得到原始预测值 valid_pred_true = self.scaler_h.inverse_transform(valid_pred_scaled) # 计算验证集指标(使用原始真实值和预测值) valid_metrics = self.calculate_metrics( y_true=h_next_valid_true.flatten(), y_pred=valid_pred_true.flatten() ) valid_metrics_history.append(valid_metrics) # 计算平均损失 avg_train_data_loss = np.mean(epoch_train_data_loss) avg_physics_loss = np.mean(epoch_physics_loss) avg_valid_data_loss = np.mean(epoch_valid_data_loss) # 记录损失 train_data_loss_history.append(avg_train_data_loss) physics_loss_history.append(avg_physics_loss) valid_data_loss_history.append(avg_valid_data_loss) # 早停机制逻辑 current_valid_loss = avg_valid_data_loss if current_valid_loss < best_valid_loss - min_delta: best_valid_loss = current_valid_loss best_epoch = epoch + 1 wait = 0 best_weights = self.model.get_weights() else: wait += 1 if wait >= patience: self.status_var.set(f"触发早停!最佳轮次: {best_epoch},最佳验证损失: {best_valid_loss:.4f}") if best_weights is not None: self.model.set_weights(best_weights) break # 更新状态 if epoch % 1 == 0: # 提取当前训练/验证的关键指标 train_rmse = train_metrics['RMSE'] valid_rmse = valid_metrics['RMSE'] train_r2 = train_metrics['R2'] valid_r2 = valid_metrics['R2'] elapsed = time.time() - start_time self.status_var.set( f"训练中 | 轮次: {epoch + 1}/{self.epochs_var.get()} | " f"训练RMSE: {train_rmse:.4f} | 验证RMSE: {valid_rmse:.4f} | " f"训练R²: {train_r2:.4f} | 验证R²: {valid_r2:.4f} | " f"k1: {self.model.k1.numpy():.6f}, k2: {self.model.k2.numpy():.6f} | 时间: {elapsed:.1f}秒 | 早停等待: {wait}/{patience}" ) self.root.update() # 绘制损失曲线 self.loss_ax.clear() epochs_range = range(1, len(train_data_loss_history) + 1) self.loss_ax.plot(epochs_range, train_data_loss_history, 'b-', label='训练数据损失') self.loss_ax.plot(epochs_range, physics_loss_history, 'r--', label='物理损失') self.loss_ax.plot(epochs_range, valid_data_loss_history, 'g-.', label='验证数据损失') self.loss_ax.set_title('PINNs训练与验证损失') self.loss_ax.set_xlabel('轮次') self.loss_ax.set_ylabel('损失', rotation=0) self.loss_ax.legend() self.loss_ax.grid(True, alpha=0.3) self.loss_ax.set_yscale('log') self.loss_canvas.draw() # 训练完成提示 elapsed = time.time() - start_time if wait >= patience: completion_msg = ( f"早停触发 | 最佳轮次: {best_epoch} | 最佳验证损失: {best_valid_loss:.4f} | " f"最佳验证RMSE: {valid_metrics_history[best_epoch - 1]['RMSE']:.4f} | " f"总时间: {elapsed:.1f}秒" ) else: completion_msg = ( f"训练完成 | 总轮次: {self.epochs_var.get()} | " f"最终训练RMSE: {train_metrics_history[-1]['RMSE']:.4f} | " f"最终验证RMSE: {valid_metrics_history[-1]['RMSE']:.4f} | " f"最终训练R²: {train_metrics_history[-1]['R2']:.4f} | " f"最终验证R²: {valid_metrics_history[-1]['R2']:.4f} | " f"总时间: {elapsed:.1f}秒" ) # 保存训练历史 self.train_history = { 'train_data_loss': train_data_loss_history, 'physics_loss': physics_loss_history, 'valid_data_loss': valid_data_loss_history, 'train_metrics': train_metrics_history, 'valid_metrics': valid_metrics_history } # 保存学习到的物理参数 self.learned_params = { "k1": self.model.k1.numpy(), "k2": self.model.k2.numpy(), "alpha": self.model.alpha.numpy(), "beta": self.model.beta.numpy() } self.status_var.set(completion_msg) messagebox.showinfo("训练完成", f"PINNs模型训练成功完成!\n{completion_msg}") except Exception as e: messagebox.showerror("训练错误", f"模型训练失败:\n{str(e)}") self.status_var.set("训练失败") def predict(self): """使用PINNs模型进行递归预测(自回归预测)""" if self.model is None: messagebox.showwarning("警告", "请先训练模型") return if self.test_df is None: messagebox.showwarning("警告", "请先选择测试集文件") return try: self.status_var.set("正在生成预测...") self.root.update() # 预处理测试数据 - 归一化 # 归一化时间特征 t_test = self.test_df['days'].values.reshape(-1, 1) t_test_scaled = self.scaler_t.transform(t_test).astype(np.float32) # 归一化时间步长特征 dt_test = self.test_df['dt'].values.reshape(-1, 1) dt_test_scaled = self.scaler_dt.transform(dt_test).astype(np.float32) # 初始水位(归一化) h_test = self.test_df['水位'].values.reshape(-1, 1) h_initial_scaled = self.scaler_h.transform(h_test[0:1]).astype(np.float32)[0] # 递归预测(自回归)带误差修正 n = len(t_test) # 初始化预测序列(归一化),第一个点使用真实值 predicted_scaled = np.zeros((n, 1), dtype=np.float32) predicted_scaled[0] = h_initial_scaled # 第一个点使用真实值 # 误差累积修正因子 error_correction_factor = 0.3 # 从第二个时间点开始预测 for i in range(1, n): # 使用上一个时间点的特征(归一化后) t_prev = t_test_scaled[i - 1:i] h_prev = predicted_scaled[i - 1:i] dt_i = dt_test_scaled[i:i + 1] # 预测当前时间点的水位 h_pred = self.model([t_prev, h_prev, dt_i]) predicted_value = h_pred.numpy()[0][0] # 误差修正:混合真实值和预测值 if i < n - 1 and i % 5 == 0: # 每5步校正一次 # 获取当前真实值(归一化后) current_actual_scaled = self.scaler_h.transform( np.array([[h_test[i][0]]]) ).astype(np.float32)[0][0] correction = current_actual_scaled - predicted_value predicted_scaled[i] = predicted_value + error_correction_factor * correction else: predicted_scaled[i] = predicted_value # 反归一化预测结果 predictions = self.scaler_h.inverse_transform(predicted_scaled) actual_values = h_test # 创建时间索引 test_time = self.test_df.index # 清除现有图表 self.ax.clear() # 绘制结果 self.ax.plot(test_time, actual_values, 'b-', label='真实值') self.ax.plot(test_time, predictions, 'r--', label='预测值') self.ax.set_title('大坝渗流水位预测结果(PINNs)') self.ax.set_xlabel('时间') self.ax.set_ylabel('测压管水位', rotation=0) self.ax.legend() # 优化时间轴刻度 self.ax.xaxis.set_major_locator(mdates.YearLocator()) self.ax.xaxis.set_major_formatter(mdates.DateFormatter('%Y')) self.ax.xaxis.set_minor_locator(mdates.MonthLocator(interval=2)) self.ax.grid(which='minor', axis='x', linestyle=':', color='gray', alpha=0.3) self.ax.grid(which='major', axis='y', linestyle='-', color='lightgray', alpha=0.5) self.ax.tick_params(axis='x', which='major', rotation=0, labelsize=9) self.ax.tick_params(axis='x', which='minor', length=2) # 计算评估指标(排除第一个点) eval_actual = actual_values[1:].flatten() eval_pred = predictions[1:].flatten() self.evaluation_metrics = self.calculate_metrics(eval_actual, eval_pred) metrics_text = ( f"MSE: {self.evaluation_metrics['MSE']:.4f} | " f"RMSE: {self.evaluation_metrics['RMSE']:.4f} | " f"MAE: {self.evaluation_metrics['MAE']:.4f} | " f"MAPE: {self.evaluation_metrics['MAPE']:.2f}% | " f"R²: {self.evaluation_metrics['R2']:.4f}" ) self.metrics_var.set(metrics_text) # 在图表上添加指标 self.ax.text( 0.5, 1.05, metrics_text, transform=self.ax.transAxes, ha='center', fontsize=9, bbox=dict(facecolor='white', alpha=0.8) ) params_text = ( f"物理参数: k1={self.learned_params['k1']:.4f}, " f"k2={self.learned_params['k2']:.4f}, " f"alpha={self.learned_params['alpha']:.4f}, " f"beta={self.learned_params['beta']:.4f}" ) self.ax.text( 0.5, 1.12, params_text, # 显示在指标上方 transform=self.ax.transAxes, ha='center', fontsize=9, bbox=dict(facecolor='white', alpha=0.8) ) # 调整布局 plt.tight_layout(pad=2.0) self.canvas.draw() # 保存预测结果 self.predictions = predictions self.actual_values = actual_values self.test_time = test_time self.status_var.set("预测完成,结果已显示") except Exception as e: messagebox.showerror("预测错误", f"预测失败:\n{str(e)}") self.status_var.set("预测失败") # 记录详细错误信息 import traceback traceback.print_exc() def save_results(self): """保存预测结果和训练历史数据""" if not hasattr(self, 'predictions') or not hasattr(self, 'train_history'): messagebox.showwarning("警告", "请先生成预测结果并完成训练") return # 选择保存路径 save_path = filedialog.asksaveasfilename( defaultextension=".xlsx", filetypes=[("Excel文件", "*.xlsx"), ("所有文件", "*.*")], title="保存结果" ) if not save_path: return try: # 1. 创建预测结果DataFrame result_df = pd.DataFrame({ '时间': self.test_time, '实际水位': self.actual_values.flatten(), '预测水位': self.predictions.flatten() }) # 2. 创建评估指标DataFrame metrics_df = pd.DataFrame([self.evaluation_metrics]) # 3. 创建训练历史DataFrame history_data = { '轮次': list(range(1, len(self.train_history['train_data_loss']) + 1)), '训练数据损失': self.train_history['train_data_loss'], '物理损失': self.train_history['physics_loss'], '验证数据损失': self.train_history['valid_data_loss'] } # 添加训练集指标 for metric in ['MSE', 'RMSE', 'MAE', 'MAPE', 'R2']: history_data[f'训练集_{metric}'] = [item[metric] for item in self.train_history['train_metrics']] # 添加验证集指标 for metric in ['MSE', 'RMSE', 'MAE', 'MAPE', 'R2']: history_data[f'验证集_{metric}'] = [item[metric] for item in self.train_history['valid_metrics']] history_df = pd.DataFrame(history_data) # 保存到Excel with pd.ExcelWriter(save_path) as writer: result_df.to_excel(writer, sheet_name='预测结果', index=False) metrics_df.to_excel(writer, sheet_name='评估指标', index=False) history_df.to_excel(writer, sheet_name='训练历史', index=False) # 保存图表 chart_path = os.path.splitext(save_path)[0] + "_chart.png" self.fig.savefig(chart_path, dpi=300) # 保存损失曲线图 loss_path = os.path.splitext(save_path)[0] + "_loss.png" self.loss_fig.savefig(loss_path, dpi=300) self.status_var.set(f"结果已保存至: {os.path.basename(save_path)}") messagebox.showinfo("保存成功", f"预测结果和图表已保存至:\n" f"主文件: {save_path}\n" f"预测图表: {chart_path}\n" f"损失曲线: {loss_path}") except Exception as e: messagebox.showerror("保存错误", f"保存结果失败:\n{str(e)}") def reset(self): # 重置归一化器 self.scaler_t = MinMaxScaler(feature_range=(0, 1)) self.scaler_h = MinMaxScaler(feature_range=(0, 1)) self.scaler_dt = MinMaxScaler(feature_range=(0, 1)) """重置程序状态""" self.train_df = None self.test_df = None self.model = None self.train_file_var.set("") self.test_file_var.set("") # 清除训练历史 if hasattr(self, 'train_history'): del self.train_history # 清除图表 if hasattr(self, 'ax'): self.ax.clear() if hasattr(self, 'loss_ax'): self.loss_ax.clear() # 重绘画布 if hasattr(self, 'canvas'): self.canvas.draw() if hasattr(self, 'loss_canvas'): self.loss_canvas.draw() # 清除状态 self.status_var.set("已重置,请选择新数据") # 清除预测结果 if hasattr(self, 'predictions'): del self.predictions # 清除指标文本 if hasattr(self, 'metrics_var'): self.metrics_var.set("") messagebox.showinfo("重置", "程序已重置,可以开始新的分析") if __name__ == "__main__": root = tk.Tk() app = DamSeepageModel(root) root.mainloop() 帮我增强一下模型的特征工程

filetype

import numpy as np import pandas as pd from sklearn.preprocessing import MinMaxScaler from tensorflow.keras.models import Sequential from tensorflow.keras.layers import LSTM, Dense from sklearn.metrics import mean_squared_error import matplotlib.pyplot as plt # 将 starttime 转换为 datetime 类型 data['starttime'] = pd.to_datetime(data['starttime']) # 提取特征和目标变量 features = data[['starttime', 'hour', 'is_weekend']] target = data['volume'] # 将 starttime 转换为时间戳 features['starttime'] = features['starttime'].map(pd.Timestamp.timestamp) # 归一化特征和目标变量 scaler_features = MinMaxScaler(feature_range=(0, 1)) scaled_features = scaler_features.fit_transform(features) scaler_target = MinMaxScaler(feature_range=(0, 1)) scaled_target = scaler_target.fit_transform(target.values.reshape(-1, 1)) # 准备时间序列数据 def create_sequences(data, seq_length): X = [] y = [] for i in range(len(data) - seq_length): X.append(data[i:i + seq_length, :]) y.append(data[i + seq_length, 0]) return np.array(X), np.array(y) seq_length = 10 X, y = create_sequences(np.concatenate((scaled_features, scaled_target), axis=1), seq_length) # 划分训练集和测试集 train_size = int(len(X) * 0.8) X_train, X_test = X[:train_size], X[train_size:] y_train, y_test = y[:train_size], y[train_size:] # 构建 LSTM 模型 model = Sequential() model.add(LSTM(50, activation='relu', input_shape=(seq_length, X_train.shape[2]))) model.add(Dense(1)) model.compile(optimizer='adam', loss='mse') # 训练模型 model.fit(X_train, y_train, epochs=10, batch_size=32, verbose=1) # 进行预测 y_pred = model.predict(X_test) # 反归一化预测结果和真实值 y_pred = scaler_target.inverse_transform(y_pred) y_test = scaler_target.inverse_transform(y_test.reshape(-1, 1)) # 计算均方误差 mse = mean_squared_error(y_test, y_pred) print(f'均方误差: {mse}') # 设置图片清晰度 plt.rcParams['figure.dpi'] = 300 # 绘制预测结果和真实值的对比图 plt.plot(y_test, label='真实值') plt.plot(y_pred, label='预测值') plt.title('交通流量预测') plt.xlabel('时间步') plt.ylabel('交通流量') plt.legend() plt.show()这是造成以上错误的代码,请提供具体的解决方案

呆壳兽
  • 粉丝: 177
上传资源 快速赚钱