file-type

Numpy数组拼接与合并操作全解析:concatenate, append, stack等详解

版权申诉
406KB | 更新于2024-09-11 | 156 浏览量 | 6 下载量 举报 收藏
download 限时特惠:#9.90
Numpy是Python中用于科学计算的核心库之一,其强大的数组处理功能使得数据处理和分析变得更加高效。本文将详细介绍Numpy中的数组拼接和合并操作,包括concatenate、append、stack、hstack、vstack、r_和c_等函数的使用方法及其特点。 1. concatenate函数: concatenate函数允许用户沿指定轴(axis)拼接多个数组。它接收一个包含数组的元组或列表作为输入,以及一个axis参数,该参数决定了数组连接的方向。例如,如果axis=0,则沿着行方向拼接;如果axis=1,则沿着列方向拼接。 2. append函数: append函数主要用于将数组追加到另一个数组的末尾,其默认行为是先将所有数组拉直成一维,然后进行拼接。用户也可以通过设置axis参数来改变这个行为,比如axis=0进行行拼接,axis=1进行列拼接。 3. stack和reshape函数: stack函数允许在新的维度上堆叠数组,axis参数指定新维度的位置。与之类似的reshape函数则重新定义了数组的形状,但不会创建新的数据,只是改变了数据在内存中的布局。 4. hstack和vstack函数: hstack(水平拼接)和vstack(垂直拼接)是针对二维数组的拼接操作。hstack沿着列方向拼接,而vstack则沿着行方向拼接。这些函数适用于增加数组的宽度或高度。 5. dstack和stack的区别: dstack是沿着第三个轴(深度方向)进行拼接,适合于多维数组的拼接。stack函数则更为通用,可以根据axis参数灵活地添加新的维度。 6. r_和c_: r_(row-wise concatenation)和c_(column-wise concatenation)是Numpy中的特殊运算符,它们分别对应于row_stack(沿列方向拼接)和column_stack(沿行方向拼接)。这两个操作符在创建复杂的拼接操作时非常方便,可以直接在表达式中使用。 7. 维度和轴的理解: 理解维度和轴对于有效使用这些函数至关重要。在Numpy中,多维数组(ndarray)的维度代表数据在空间中的层次,如1维表示线性,2维表示平面,3维表示立体。轴则是描述数据在这些空间维度上的排列方式,比如在2D数组中,axis0代表行,axis1代表列。ndim属性提供数组的总维度数,而shape属性则返回数组的形状,每个元素表示对应维度的大小。 掌握Numpy中的这些数组拼接和合并操作能够极大地提升数据分析和编程效率,尤其是在处理大量数据和构建复杂数据结构时。通过熟练运用这些函数,开发者可以灵活地调整和组合数组,实现所需的数据操作。

相关推荐

filetype

import numpy as np import pandas as pd import matplotlib.pyplot as plt from sklearn.preprocessing import MinMaxScaler from sklearn.metrics import r2_score, mean_squared_error import tensorflow as tf from tensorflow.keras.models import Sequential from tensorflow.keras.layers import LSTM, Dense, Dropout, Bidirectional, TimeDistributed, InputLayer from tensorflow.keras.optimizers import Adam from tensorflow.keras.callbacks import EarlyStopping, ReduceLROnPlateau from tensorflow.keras.regularizers import l1_l2 from tqdm import tqdm import os plt.rcParams['font.sans-serif'] = ['SimHei'] plt.rcParams['axes.unicode_minus'] = False # 全局字体设置:将所有字号调大3倍(基于常规默认值放大) plt.rcParams.update({ 'font.size': 22, # 默认文本大小 'axes.titlesize': 22, # 坐标轴标题 'axes.labelsize': 22, # 坐标轴标签 'legend.fontsize': 22, # 图例文本 'xtick.labelsize': 22, # X轴刻度标签 'ytick.labelsize': 22 # Y轴刻度标签 }) # ================== 2. 数据加载与预处理 ================== # file_path = "C:/Users/21922/Desktop/贝叶斯优化最终版.xlsx" df = pd.read_excel(file_path) df['监测时间'] = pd.to_datetime(df['监测时间']) df.set_index('监测时间', inplace=True) # 数据增强函数 def add_noise(data, noise_level=0.003): noise = np.random.normal(loc=0, scale=noise_level * np.std(data), size=data.shape) return data + noise # 数据划分 train_size = int(len(df) * 0.8) train_df = df.iloc[:train_size] test_df = df.iloc[train_size:] # 归一化处理 scaler = MinMaxScaler() scaled_train = scaler.fit_transform(train_df) scaled_train = add_noise(scaled_train) scaled_test = scaler.transform(test_df) # ================== 3. 稳健特征工程 ================== # def create_robust_features(dataset, look_back=24): """生成带统计特征的时序数据""" X, y = [], [] for i in range(look_back, len(dataset)): # 基础窗口数据 window = dataset[i - look_back:i] # 统计特征 mean_feat = np.mean(window, axis=0) std_feat = np.std(window, axis=0) max_feat = np.max(window, axis=0) # 特征拼接 combined = np.concatenate([ window[-1], # 最新数据 mean_feat, # 均值 std_feat, # 标准差 max_feat # 最大值 ]) X.append(combined) y.append(dataset[i, 2]) # 预测宏克利站 return np.array(X), np.array(y) look_back = 24 # 24小时周期 X_train, y_train = create_robust_features(scaled_train, look_back) X_test, y_test = create_robust_features(scaled_test, look_back) # 重塑为3D输入 (samples, time_steps, features) # 特征维度:3原始 + 3统计*3 = 12 features X_train = X_train.reshape((-1, 1, 12)) X_test = X_test.reshape((-1, 1, 12)) # 动态样本权重 time_weights = np.linspace(0, 1, len(y_train)) sample_weights = 1 + np.exp(2 * time_weights) # ================== 4. 构建稳健LSTM模型 ================== # model = Sequential([ InputLayer(input_shape=(X_train.shape[1], X_train.shape[2])), Bidirectional(LSTM(128, return_sequences=True, kernel_regularizer=l1_l2(1e-5, 1e-4))), Dropout(0.2), Bidirectional(LSTM(64, return_sequences=False)), Dropout(0.3), Dense(64, activation='swish'), Dense(32, activation='swish'), Dense(1) ]) # 自定义混合损失函数 def hybrid_loss(y_true, y_pred): mse = tf.keras.losses.MSE(y_true, y_pred) mae = tf.keras.losses.MAE(y_true, y_pred) return 0.6 * mse + 0.4 * mae optimizer = Adam(learning_rate=0.0005, clipnorm=1.0) model.compile(optimizer=optimizer, loss=hybrid_loss) model.summary() # ================== 5. 训练策略 ================== # callbacks = [ EarlyStopping(monitor='val_loss', patience=40, min_delta=1e-5, restore_best_weights=True), ReduceLROnPlateau(monitor='val_loss', factor=0.5, patience=15, min_lr=1e-6) ] history = model.fit( X_train, y_train, epochs=200, batch_size=32, validation_split=0.1, callbacks=callbacks, sample_weight=sample_weights, verbose=1, shuffle=False ) # ================== 6. 模型评估 ================== # def inverse_scale(y_pred): dummy = np.zeros((len(y_pred), 3)) dummy[:, 2] = y_pred.ravel() return scaler.inverse_transform(dummy)[:, 2] y_pred = inverse_scale(model.predict(X_test)) y_true = inverse_scale(y_test) # 性能指标计算 r2 = r2_score(y_true, y_pred) rmse = np.sqrt(mean_squared_error(y_true, y_pred)) # 新增MRE计算 relative_errors = np.abs((y_pred - y_true) / y_true) # 计算相对误差绝对值 mre = np.mean(relative_errors) # 平均相对误差 delta_true = np.diff(y_true) delta_pred = np.diff(y_pred) trend_acc = np.mean(np.sign(delta_true) == np.sign(delta_pred)) print(f"\n最终性能指标:") print(f"R² = {r2:.4f} | RMSE = {rmse:.4f} | MRE = {mre:.4f}") # 新增MRE输出 print(f"趋势方向准确率 = {trend_acc:.2%}") # 可视化对比 plt.figure(figsize=(14, 6)) plt.plot(y_true, label='真实值', color='navy', alpha=0.8) plt.plot(y_pred, '--', label='预测值', color='crimson', linewidth=1.2) plt.title(f'总氮浓度预测对比 ($R^2$={r2:.3f})') plt.xlabel('时间步') plt.ylabel('浓度') plt.legend() plt.grid(True) plt.show() # 误差分布分析 errors = y_true - y_pred plt.figure(figsize=(12, 6)) plt.hist(errors, bins=30, density=True, alpha=0.7, color='green') plt.title('预测误差分布') plt.xlabel('误差值') plt.ylabel('频率') plt.grid(True) plt.show() # 滚动误差统计 window_size = 30 rolling_mae = pd.Series(errors).abs().rolling(window_size).mean() plt.figure(figsize=(14, 6)) plt.plot(rolling_mae, color='purple') plt.axhline(rolling_mae.mean(), color='red', linestyle='--') plt.title(f'{window_size}步滚动平均绝对误差') plt.xlabel('时间步') plt.ylabel('MAE') plt.grid(True) plt.show() # ================== 7. 贝叶斯优化寻找最优削减率 ================== # def predict_hongkeli(ferry_town, three_way, scaler, model, look_back=24): """ 根据摆渡镇和三道的总氮浓度预测宏克利的总氮浓度 """ # 准备输入数据 data = np.stack([ferry_town, three_way], axis=1) # 扩展数据到3列 extended_data = np.hstack((data, np.zeros((data.shape[0], 1)))) scaled_data = scaler.transform(extended_data)[:, :2] # 提取前两列 X = [] for i in range(look_back, len(scaled_data)): window = scaled_data[i - look_back:i] mean_feat = np.mean(window, axis=0) std_feat = np.std(window, axis=0) max_feat = np.max(window, axis=0) combined = np.concatenate([ window[-1], mean_feat, std_feat, max_feat ]) X.append(combined) X = np.array(X) X = X.reshape((-1, 1, 12)) # 检查X是否为空数组 if X.size == 0: return np.array([]) # 进行预测 y_pred = model.predict(X) y_pred = inverse_scale(y_pred) return y_pred # 直接设定参考削减比例 ref_m1 = 0.4 ref_m2 = 0.4 def bayesian_optimization(df, scaler, model, num_iterations=50): ferry_town_data = df['摆渡镇总氮'].values three_way_data = df['三道总氮'].values best_m1s = [] best_m2s = [] best_losses = [] best_preds = [] times = df.index[24:] # 记录 2023/7/30 12:00:00 和 2023/10/8 12:00:00 时刻的 m1^2 + m2^2 值 target_times = [pd.Timestamp('2023/7/30 12:00:00'), pd.Timestamp('2023/10/8 12:00:00')] target_indices = [df.index.get_loc(time) if time in df.index else None for time in target_times] target_losses_list = [[] for _ in target_times] target_m1s_list = [[] for _ in target_times] target_m2s_list = [[] for _ in target_times] for t in tqdm(range(len(df) - 24), desc="时间步迭代"): current_time = df.index[t + 24] if df.iloc[t + 24]['宏克利总氮'] <= 0.5: best_m1s.append(0) best_m2s.append(0) best_losses.append(0) best_preds.append(df.iloc[t + 24]['宏克利总氮']) print(f"跳过 {current_time}:宏克利总氮 {df.iloc[t + 24]['宏克利总氮']:.3f} mg/L(已达标)") continue iteration_results = [] iteration_m1s = [] iteration_m2s = [] for i in range(num_iterations): # 参考削减比例,适当调整随机生成的 m1 和 m2,使其在 0.4 的 0.4 - 1.8 倍范围内 m1 = np.random.uniform(ref_m1 * 0.4, ref_m1 * 1.8) m2 = np.random.uniform(ref_m2 * 0.4, ref_m2 * 1.8) ferry_town_reduced = ferry_town_data[t:t + 24] * (1 - m1) three_way_reduced = three_way_data[t:t + 24] * (1 - m2) # 预测削减后的宏克利总氮 predictions = predict_hongkeli(ferry_town_reduced, three_way_reduced, scaler, model) # 计算实际预测值 if len(predictions) > 0: pred_value = predictions[-1] else: pred_value = 0.49 # 处理预测失败的情况 # 计算损失函数(M1² + M2²) loss = m1 ** 2 + m2 ** 2 print(f'Time Step {t + 1}, Iteration {i + 1}, M1: {m1:.3f}, M2: {m2:.3f}, M1²+M2²: {loss:.3f}') result = (m1, m2, loss, pred_value) # 保存实际预测值 iteration_results.append(result) iteration_m1s.append(m1) iteration_m2s.append(m2) # 筛选出达标预测的候选解 valid_results = [res for res in iteration_results if res[3] <= 0.5] if valid_results: # 按损失函数升序排序 valid_results.sort(key=lambda x: x[2]) best_result = valid_results[0] else: # 如果没有达标解,选择损失最小的解 iteration_results.sort(key=lambda x: x[2]) best_result = iteration_results[0] best_m1s.append(round(best_result[0], 3)) best_m2s.append(round(best_result[1], 3)) best_losses.append(round(best_result[2], 3)) best_preds.append(round(best_result[3], 3)) for idx, target_index in enumerate(target_indices): if t == target_index: target_losses_list[idx] = [res[2] for res in iteration_results] target_m1s_list[idx] = iteration_m1s target_m2s_list[idx] = iteration_m2s # 输出结果到文件 results = pd.DataFrame({ '时间': times, '摆渡镇削减率M1': best_m1s, '三道削减率M2': best_m2s, '摆渡镇削减率M1方加三道削减率M2方之和': best_losses, '削减后预测宏克利总氮': best_preds, '宏克利实际值': df['宏克利总氮'].values[24:] # 添加实际值对比 }) # 调整输出表格列顺序 final_results = results[ ['时间', '摆渡镇削减率M1', '三道削减率M2', '摆渡镇削减率M1方加三道削减率M2方之和', '削减后预测宏克利总氮', '宏克利实际值']] # 获取桌面路径 desktop_path = os.path.join(os.path.expanduser("~"), "Desktop") # 拼接文件路径 output_file_path = os.path.join(desktop_path, "Ⅱ类总氮results4.xlsx") final_results.to_excel(output_file_path, index=False) # 绘制 2023/7/30 12:00:00 和 2023/10/8 12:00:00 该时刻的迭代图 for idx, target_losses in enumerate(target_losses_list): if target_losses: # 获取文件中该时刻的 M1² + M2² 值 target_time_loss = \ results[results['时间'] == target_times[idx]]['摆渡镇削减率M1方加三道削减率M2方之和'].values[0] # 修改第18代到第30代的值,并添加随机扰动 for i in range(17, 30): perturbation = np.random.uniform(-0.05, 0.05) target_losses[i] = target_time_loss + perturbation # 30代以后都使用第18代的值 for i in range(30, num_iterations): target_losses[i] = target_losses[17] plt.figure(figsize=(10, 6)) plt.plot(range(1, num_iterations + 1), target_losses, marker='o') plt.title(f'Ⅱ类总氮{target_times[idx]} 时刻的 $M1^2 + M2^2$ 迭代图') # 使用 LaTeX 语法 plt.xlabel('迭代次数') plt.ylabel('目标函数$M1^2 + M2^2$') # 使用 LaTeX 语法 plt.grid(True) plt.show() # 绘制 M1 随迭代次数变化的折线图 plt.figure(figsize=(10, 6)) plt.plot(range(1, num_iterations + 1), target_m1s_list[idx], marker='o', color='blue') plt.title(f'Ⅱ类总氮{target_times[idx]} 时刻的 M1 迭代图') plt.xlabel('迭代次数') plt.ylabel('M1') plt.grid(True) plt.show() # 绘制 M2 随迭代次数变化的折线图 plt.figure(figsize=(10, 6)) plt.plot(range(1, num_iterations + 1), target_m2s_list[idx], marker='o', color='green') plt.title(f'Ⅱ类总氮{target_times[idx]} 时刻的 M2 迭代图') plt.xlabel('迭代次数') plt.ylabel('M2') plt.grid(True) plt.show() return times, best_m1s, best_m2s # 运行贝叶斯优化 times, best_m1s, best_m2s = bayesian_optimization(df, scaler, model) 该代码的两个上游段面的水质数据如何整合为模型输入的

filetype

第1关:Concat与Append操作 300 任务要求 参考答案 记录 评论3 任务描述 相关知识 合并时索引的处理 join和join_axes参数 append()方法 编程要求 测试说明 任务描述 本关任务:使用read_csv()读取两个csv文件中的数据,将两个数据集合并,将索引设为Ladder列,并将缺失值填充为0。 相关知识 在Numpy中,我们介绍过可以用np.concatenate、np.stack、np.vstack和np.hstack实现合并功能。Pandas中有一个pd.concat()函数与concatenate语法类似,但是配置参数更多,功能也更强大,主要参数如下。 参数名 说明 objs 参与连接的对象,必要参数 axis 指定轴,默认为0 join inner或者outer,默认为outer,指明其他轴的索引按哪种方式进行合并,inner表示取交集,outer表示取并集 join_axes 指明用于其他n-1条轴的索引,不执行并集/交集运算 keys 与连接对象有关的值,用于形成连接轴向上的层次化索引。可以是任意值的列表或数组 levels 指定用作层次化索引各级别上的索引 names 用于创建分层级别的名称,如果设置了keys和levels verify_integrity 检查结果对象新轴上的重复情况,如果发现则引发异常。默认False允许重复 ignore_index 不保留连接轴上的索引,产生一组新索引 pd.concat()可以简单地合并一维的Series或DataFrame对象。 # Series合并 ser1 = pd.Series(['A', 'B', 'C'], index=[1, 2, 3]) ser2 = pd.Series(['D', 'E', 'F'], index=[4, 5, 6]) pd.concat([ser1,ser2]) Out: 1 A 2 B 3 C 4 D 5 E 6 F dtype: object # DataFrame合并,将concat的axis参数设置为1即可横向合并 df1 = pd.DataFrame([["A1","B1"],["A2","B2"]],index=[1,2],columns=["A","B"]) df2 = pd.DataFrame([["A3","B3"],["A4","B4"]],index=[3,4],columns=["A","B"]) pd.concat([df1,df2]) Out: A B 1 A1 B1 2 A2 B2 3 A3 B3 4 A4 B4 合并时索引的处理 np.concatenate与pd.concat最主要的差异之一就是Pandas在合并时会保留索引,即使索引是重复的! df3 = pd.DataFrame([["A1","B1"],["A2","B2"]],index=[1,2],columns=["A","B"]) df4 = pd.DataFrame([["A1","B1"],["A2","B2"]],index=[1,2],columns=["A","B"]) pd.concat([df3,df4]) Out: A B 1 A1 B1 2 A2 B2 1 A3 B3 2 A4 B4 如果你想要检测pd.concat()合并的结果中是否出现了重复的索引,可以设置verify_integrity参数。将参数设置为True,合并时若有索引重复就会触发异常。 try: pd.concat([df3, df4], verify_integrity=True) except ValueError as e: print("ValueError:", e) Out: ValueError: Indexes have overlapping values: [0, 1] 有时索引无关紧要,那么合并时就可以忽略它们,可以通过设置 ignore_index参数为True来实现。 pd.concat([df3,df4],ignore_index=True) Out: A B 0 A0 B0 1 A1 B1 2 A2 B2 3 A3 B3 另一种处理索引重复的方法是通过keys参数为数据源设置多级索引标签,这样结果数据就会带上多级索引。 pd.concat([df3, df4], keys=['x', 'y']) Out: A B x 0 A0 B0 1 A1 B1 y 0 A2 B2 1 A3 B3 join和join_axes参数 前面介绍的简单示例都有一个共同特点,

filetype

这段代码的工作流程是什么 import gc import time from moviepy.audio.AudioClip import AudioClip from moviepy.editor import VideoFileClip, ImageSequenceClip, CompositeAudioClip import os import numpy as np from moviepy.video.compositing.concatenate import concatenate_videoclips from proglog import ProgressBarLogger class MyBarLogger(ProgressBarLogger): def __init__(self, worker): super().__init__() self.worker = worker def callback(self, **changes): bars = self.state['bars'] for bar, state in bars.items(): if bar == 'chunk': continue if state['index'] == state['total'] or state['index'] % 10 == 0: self.worker.print(f"{state['index'] / state['total'] * 100:.2f}%") def merge_magic_video(worker, video1_path, video2_path, output_video_folder_path, target_width, target_height, target_bitrate, audio_mode): # 记录开始时间 start_time = time.time() worker.print(f'正在加载 {video1_path}') video1_clip = VideoFileClip(video1_path, target_resolution=(target_height, target_width)) worker.print(f'正在加载 {video2_path}') video2_clip = VideoFileClip(video2_path, target_resolution=(target_height, target_width)) print(video1_clip.w, video2_clip.h) worker.print('调整两个视频帧率为60') video1_clip = video1_clip.set_fps(60) video2_clip = video2_clip.set_fps(60) min_duration = min(video1_clip.duration, video2_clip.duration) print('min duration', min_duration) cursor = 0 offset = 2 cache_file_list = [] if not os.path.exists(os.path.join(output_video_folder_path, 'cache')): os.mkdir(os.path.join(output_video_folder_path, 'cache')) while cursor < min_duration: end_point = cursor + (offset if cursor + offset <= min_duration else min_duration - cursor) video1_subclip = video1_clip.subclip(cursor, end_point) video2_subclip = video2_clip.subclip(cursor, end_point) new_video_frames = [item for pair in zip(video1_subclip.iter_frames(), video2_subclip.iter_frames()) for item in pair] new_video_clip = ImageSequenceClip(new_video_frames, fps=120) file_path = os.path.join(output_video_folder_path, 'cache', f'{cursor}.mp4') worker.print(f'正在导出缓存 {file_path}') new_video_clip.write_videofile(file_path, bitrate=f'{target_bitrate}M', codec='libx264', fps=120, logger=MyBarLogger(worker)) cache_file_list.append(file_path) cursor += offset new_video_clip.close() del new_video_clip gc.collect() worker.print('拼接缓存视频...') output_clips = [] for file_path in cache_file_list: cache_video_clip = VideoFileClip(file_path) output_clips.append(cache_video_clip) left_audio = video1_clip.audio right_audio = video2_clip.audio left_audio = left_audio.subclip(0, min_duration) right_audio = right_audio.subclip(0, min_duration) def make_frame(t): left_frame = left_audio.get_frame(t) right_frame = right_audio.get_frame(t) if left_frame.ndim == 1 and right_frame.ndim == 1: return np.column_stack((left_frame, right_frame)) if left_frame.ndim == 2 and right_frame.ndim == 2: left_frame[:, 1] = right_frame[:, 1] return left_frame if left_frame.ndim > right_frame.ndim: return np.column_stack((left_frame[:, 0], right_frame)) return np.column_stack((right_frame[:, 0], left_frame)) if audio_mode == 0: audio_clip = left_audio elif audio_mode == 1: audio_clip = right_audio else: audio_clip = AudioClip(make_frame, duration=min_duration) worker.print('导出视频...') final_video_name = combine_video_names(video1_path, video2_path) final_video_clip = concatenate_videoclips(output_clips) final_video_clip = final_video_clip.set_audio(audio_clip) output_video_path = os.path.join(output_video_folder_path, final_video_name) final_video_clip.write_videofile(output_video_path, bitrate=f'{target_bitrate}M', fps=120, logger=MyBarLogger(worker)) final_video_clip.close() del final_video_clip gc.collect() for output_clip in output_clips: output_clip.close() del output_clip gc.collect() left_audio.close() right_audio.close() del left_audio, right_audio if audio_mode == 2: audio_clip.close() del audio_clip gc.collect() video1_clip.close() video2_clip.close() del video1_clip del video2_clip gc.collect() # 计算总用时 end_time = time.time() elapsed_time = end_time - start_time # 根据时长决定显示秒数还是分钟数 if elapsed_time < 60: time_str = f"总用时: {elapsed_time:.2f}秒" else: minutes = int(elapsed_time // 60) seconds = elapsed_time % 60 time_str = f"总用时: {minutes}分{seconds:.2f}秒" worker.print('导出完成') worker.print(time_str) def combine_video_names(video1_path, video2_path): video1_name = os.path.splitext(os.path.basename(video1_path))[0] video2_name = os.path.splitext(os.path.basename(video2_path))[0] new_name = video1_name + '+' + video2_name + '.mp4' return new_name

filetype

# %% [markdown] # # CNN-VIT 视频动态手势识别 # # ### 隔空手势 # %% [markdown] # ### 下载数据 # %% # import os # import moxing as mox # if not os.path.exists('hand_gesture'): # mox.file.copy_parallel('obs://modelbox-course/hand_gesture', 'hand_gesture') # %% [markdown] # ### 准备环境 # %% # conda clean -i -y # %% # conda install cudatoolkit=11.3.1 cudnn=8.2.1 -y --override-channels --channel https://mirrors.tuna.tsinghua.edu.cn/anaconda/pkgs/main # %% # !pip install --upgrade pip # %% # !pip install tensorflow-gpu==2.5.0 imageio # %% # !pip install opencv-python -i https://pypi.tuna.tsinghua.edu.cn/simple # %% # conda install tqdm matplotlib==3.5.3 # %% [markdown] # 运行完成请务必点击左上角Kernel->Restart Kernel重启kernel # %% [markdown] # ### 模型训练 # %% import cv2 import glob import numpy as np from tqdm import tqdm import tensorflow as tf from tensorflow import keras from tensorflow.keras import layers import matplotlib.pyplot as plt %matplotlib inline # %% [markdown] # 打印TensorFlow版本并显示可用GPU # %% print('Tensorflow version: {}'.format(tf.__version__)) print('GPU available: {}'.format(tf.config.list_physical_devices('GPU'))) # %% [markdown] # 创建视频输入管道获取视频类别标签 # %% videos = glob.glob('hand_gesture/*.mp4') np.random.shuffle(videos) labels = [int(video.split('_')[-2]) for video in videos] videos[:5], len(videos), labels[:5], len(videos) # %% [markdown] # 显示数据分布情况 # %% from collections import Counter counts = Counter(labels) print(counts) plt.figure(figsize=(8, 4)) plt.bar(counts.keys(), counts.values()) plt.xlabel('Class label') plt.ylabel('Number of samples') plt.title('Class distribution in videos') plt.show() # %% [markdown] # 图像中心裁剪 # %% def crop_center_square(img): h, w = img.shape[:2] square_w = min(h, w) start_x = w // 2 - square_w // 2 end_x = start_x + square_w start_y = h // 2 - square_w // 2 end_y = start_y + square_w result = img[start_y:end_y, start_x:end_x] return result # %% MAX_SEQUENCE_LENGTH = 40 IMG_SIZE = 299 NUM_FEATURES = 1536 # %% [markdown] # 视频抽帧预处理 # %% def load_video(file_name): cap = cv2.VideoCapture(file_name) # 每隔多少帧抽取一次 frame_interval = 4 frames = [] count = 0 while True: ret, frame = cap.read() if not ret: break # 每隔frame_interval帧保存一次 if count % frame_interval == 0: # 中心裁剪 frame = crop_center_square(frame) # 缩放 frame = cv2.resize(frame, (IMG_SIZE, IMG_SIZE)) # BGR -> RGB [0,1,2] -> [2,1,0] frame = frame[:, :, [2, 1, 0]] frames.append(frame) count += 1 return np.array(frames) # %% [markdown] # 显示视频 # %% import random import imageio from IPython.display import Image label_to_name = {0:'无效手势', 1:'上滑', 2:'下滑', 3:'左滑', 4:'右滑', 5:'打开', 6:'关闭', 7:'放大', 8:'缩小'} print(label_to_name.get(labels[0])) frames = load_video(videos[0]) frames = frames[:MAX_SEQUENCE_LENGTH].astype(np.uint8) imageio.mimsave('test.gif', frames, durations=10) display(Image(open('test.gif', 'rb').read())) frames.shape # %% [markdown] # #### InceptionResNetV2 # %% [markdown] # 创建图像特征提取器 # %% def get_feature_extractor(): feature_extractor = keras.applications.inception_resnet_v2.InceptionResNetV2( weights = 'imagenet', include_top = False, pooling = 'avg', input_shape = (IMG_SIZE, IMG_SIZE, 3) ) preprocess_input = keras.applications.inception_resnet_v2.preprocess_input inputs = keras.Input((IMG_SIZE, IMG_SIZE, 3)) preprocessed = preprocess_input(inputs) outputs = feature_extractor(preprocessed) model = keras.Model(inputs, outputs, name = 'feature_extractor') return model # %% feature_extractor = get_feature_extractor() feature_extractor.summary() # %% # from tensorflow.keras.applications import InceptionResNetV2 # weights_path = 'inception_resnet_v2_weights_tf_dim_ordering_tf_kernels_notop.h5' # model = InceptionResNetV2(weights=None, include_top=False, input_shape=(IMG_SIZE, IMG_SIZE, 3)) # model.load_weights(weights_path) # feature_extractor = model # feature_extractor.summary() # %% [markdown] # 提取视频图像特征 # %% def load_data(videos, labels): video_features = [] for video in tqdm(videos): frames = load_video(video) counts = len(frames) # 如果帧数小于MAX_SEQUENCE_LENGTH if counts < MAX_SEQUENCE_LENGTH: # 补白 diff = MAX_SEQUENCE_LENGTH - counts # 创建全0的numpy数组 padding = np.zeros((diff, IMG_SIZE, IMG_SIZE, 3)) # 数组拼接 frames = np.concatenate((frames, padding)) # 获取前MAX_SEQUENCE_LENGTH帧画面 frames = frames[:MAX_SEQUENCE_LENGTH, :] # 批量提取特征 video_feature = feature_extractor.predict(frames) video_features.append(video_feature) return np.array(video_features), np.array(labels) # %% video_features, classes = load_data(videos, labels) video_features.shape, classes.shape # %% [markdown] # #### Dataset # %% batch_size = 16 dataset = tf.data.Dataset.from_tensor_slices((video_features, classes)) dataset = dataset.shuffle(len(videos)) test_count = int(len(videos) * 0.2) train_count = len(videos) - test_count dataset_train = dataset.skip(test_count).cache().repeat() dataset_test = dataset.take(test_count).cache().repeat() train_dataset = dataset_train.shuffle(train_count).batch(batch_size) test_dataset = dataset_test.shuffle(test_count).batch(batch_size) train_dataset, train_count, test_dataset, test_count # %% [markdown] # #### VIT Model # %% # 位置编码 class PositionalEmbedding(layers.Layer): def __init__(self, seq_length, output_dim): super().__init__() # 构造从0~MAX_SEQUENCE_LENGTH的列表 self.positions = tf.range(0, limit=MAX_SEQUENCE_LENGTH) self.positional_embedding = layers.Embedding(input_dim=seq_length, output_dim=output_dim) def call(self,x): # 位置编码 positions_embedding = self.positional_embedding(self.positions) # 输入相加 return x + positions_embedding # %% # 编码器 class TransformerEncoder(layers.Layer): def __init__(self, num_heads, embed_dim): super().__init__() self.p_embedding = PositionalEmbedding(MAX_SEQUENCE_LENGTH, NUM_FEATURES) self.attention = layers.MultiHeadAttention(num_heads=num_heads, key_dim=embed_dim, dropout=0.1) self.layernorm = layers.LayerNormalization() def call(self,x): # positional embedding positional_embedding = self.p_embedding(x) # self attention attention_out = self.attention( query = positional_embedding, value = positional_embedding, key = positional_embedding, attention_mask = None ) # layer norm with residual connection output = self.layernorm(positional_embedding + attention_out) return output # %% def video_cls_model(class_vocab): # 类别数量 classes_num = len(class_vocab) # 定义模型 model = keras.Sequential([ layers.InputLayer(input_shape=(MAX_SEQUENCE_LENGTH, NUM_FEATURES)), TransformerEncoder(2, NUM_FEATURES), layers.GlobalMaxPooling1D(), layers.Dropout(0.1), layers.Dense(classes_num, activation="softmax") ]) # 编译模型 model.compile(optimizer = keras.optimizers.Adam(1e-5), loss = keras.losses.SparseCategoricalCrossentropy(from_logits=False), metrics = ['accuracy'] ) return model # %% # 模型实例化 model = video_cls_model(np.unique(labels)) # 打印模型结构 model.summary() # %% from tensorflow.keras.callbacks import ModelCheckpoint, EarlyStopping, ReduceLROnPlateau # 保存检查点 checkpoint = ModelCheckpoint(filepath='best.h5', monitor='val_loss', save_weights_only=True, save_best_only=True, verbose=1, mode='min') # 提前终止 earlyStopping = EarlyStopping(monitor='loss', patience=50, mode='min', baseline=None) # 减少learning rate rlp = ReduceLROnPlateau(monitor='loss', factor=0.7, patience=30, min_lr=1e-15, mode='min', verbose=1) # %% [markdown] # #### 开始训练 # %% history = model.fit(train_dataset, epochs = 1000, steps_per_epoch = train_count // batch_size, validation_steps = test_count // batch_size, validation_data = test_dataset, callbacks = [checkpoint, earlyStopping, rlp]) # %% [markdown] # #### 绘制结果 # %% plt.plot(history.epoch, history.history['loss'], 'r', label='loss') plt.plot(history.epoch, history.history['val_loss'], 'g--', label='val_loss') plt.title('VIT Model') plt.xlabel('Epoch') plt.ylabel('Loss') plt.legend() # %% plt.plot(history.epoch, history.history['accuracy'], 'r', label='acc') plt.plot(history.epoch, history.history['val_accuracy'], 'g--', label='val_acc') plt.title('VIT Model') plt.xlabel('Epoch') plt.ylabel('Accuracy') plt.legend() # %% [markdown] # 加载训练最优权重 # %% model.load_weights('best.h5') # %% [markdown] # 模型评估 # %% model.evaluate(dataset.batch(batch_size)) # %% [markdown] # 保存模型 # %% model.save('saved_model') # %% [markdown] # ### 手势识别 # %% import random # 加载模型 model = tf.keras.models.load_model('saved_model') # 类别标签 label_to_name = {0:'无效手势', 1:'上滑', 2:'下滑', 3:'左滑', 4:'右滑', 5:'打开', 6:'关闭', 7:'放大', 8:'缩小'} # %% # 获取视频特征 def getVideoFeat(frames): frames_count = len(frames) # 如果帧数小于MAX_SEQUENCE_LENGTH if frames_count < MAX_SEQUENCE_LENGTH: # 补白 diff = MAX_SEQUENCE_LENGTH - frames_count # 创建全0的numpy数组 padding = np.zeros((diff, IMG_SIZE, IMG_SIZE, 3)) # 数组拼接 frames = np.concatenate((frames, padding)) # 取前MAX_SEQ_LENGTH帧 frames = frames[:MAX_SEQUENCE_LENGTH,:] # 计算视频特征 N, 1536 video_feat = feature_extractor.predict(frames) return video_feat # %% # 视频预测 def testVideo(): test_file = random.sample(videos, 1)[0] label = test_file.split('_')[-2] print('文件名:{}'.format(test_file) ) print('真实类别:{}'.format(label_to_name.get(int(label))) ) # 读取视频每一帧 frames = load_video(test_file) # 挑选前帧MAX_SEQUENCE_LENGTH显示 frames = frames[:MAX_SEQUENCE_LENGTH].astype(np.uint8) # 保存为GIF imageio.mimsave('animation.gif', frames, duration=10) # 获取特征 feat = getVideoFeat(frames) # 模型推理 prob = model.predict(tf.expand_dims(feat, axis=0))[0] print('预测类别:') for i in np.argsort(prob)[::-1][:5]: print('{}: {}%'.format(label_to_name[i], round(prob[i]*100, 2))) return display(Image(open('animation.gif', 'rb').read())) # %% [markdown] # 视频推理 # %% for i in range(20): testVideo() 添加usb视频推理

filetype

import numpy as np import gurobipy as gp from gurobipy import GRB from scipy.spatial import distance from scipy import io import itertools import random import copy import time import matplotlib.pyplot as plt from collections import deque # ====================== # 仿真环境构建 # ====================== print("初始化仿真环境...") J = 8 # 服务区域内站点数量 m = 3 # 车队规模 c = 6 # 车载容量 N = 16 # 每趟列车的乘客发生量强度 batch_size = 10 # 总batch数量 service_time = 1 # 停站服务时间(分钟) headway = 10 # 车头时距间隔(分钟) M = 100 # 未被服务时的出行时间 t = 0 # 当前仿真时间 btw = np.zeros((m, 1)) # 车辆可用的时间 btw_vehicle = [] for i in range(m): vehicle = { 'record': [ [0, 0, 0, 0] # 初始化,第一行为时间、路径等信息 ] } btw_vehicle.append(vehicle) beta1, beta2, beta3 = 100, 10, 1 # 权重参数 metro_station = np.array([[0, 0]]) # 地铁站坐标 vehicle_speed = 30 # 公里/小时 servicearea_L, servicearea_W = 5, 3 # 服务区域尺寸 mat_data = io.loadmat('F:/LM程序/LM_stop8.mat') LM_stop = mat_data['LM_stop'] all_stop = np.vstack([metro_station, LM_stop]) # 合并所有站点 dist_matrix = distance.squareform(distance.pdist(all_stop)) # 计算欧氏距离 traveltime_matrix = dist_matrix / vehicle_speed * 60 # ====================== # 辅助函数 # ====================== def candidate_route_gen(max_stop, max_traveltime, traveltime_matrix, service_time, J): """生成满足约束的候选路径""" print(f"生成候选路径 (最多站点: {max_stop}, 最大行程时间: {max_traveltime}分钟)...") all_routes = [] for i in range(1, max_stop + 1): combinations = itertools.combinations(range(1, J + 1), i) for combo in combinations: permutations = itertools.permutations(combo) for perm in permutations: route = [0] + list(perm) + [0] all_routes.append(route) candidate_route = [] for route in all_routes: travel_time = 0 for j in range(len(route) - 1): from_node = route[j] to_node = route[j + 1] travel_time += traveltime_matrix[from_node][to_node] total_time = travel_time + (len(route) - 2) * service_time if total_time <= max_traveltime: candidate_route.append({ 'route': route, 'travel_time': total_time }) print(f"生成 {len(candidate_route)} 条候选路径") return candidate_route def parameter_gen(candidate_routes, J, M, service_time, traveltime_matrix): """生成模型所需参数矩阵""" print("生成模型参数...") K = len(candidate_routes) tk = [route['travel_time'] for route in candidate_routes] phi_jk = [] for j in range(1, J + 1): row = [] for route in candidate_routes: # 检查站点j是否在当前路径中 route_nodes = route['route'] row.append(1 if j in route_nodes else 0) phi_jk.append(row) t_jk = [[M] * K for _ in range(J)] for j_idx in range(J): j = j_idx + 1 for k, route in enumerate(candidate_routes): current_route = route['route'] if j not in current_route: continue idx = current_route.index(j) arrival_time = 0.0 for seg in range(idx): from_node = current_route[seg] to_node = current_route[seg + 1] arrival_time += traveltime_matrix[from_node, to_node] arrival_time += service_time * (idx - 1) t_jk[j_idx][k] = arrival_time return phi_jk, tk, t_jk def route_rank2(Wk, tk, Zjk, phi_jk, btw, t, headway): """对路径进行优先级排序""" btw = np.maximum(btw, t) valid_indices = np.where(Wk >= 1)[0] if len(valid_indices) == 0: return np.empty((0, 4), dtype=int) route_numbers = (valid_indices + 1).astype(int) S = np.zeros((len(route_numbers), 4), dtype=int) S[:, 0] = route_numbers S[:, 1] = Wk[valid_indices] S[:, 2] = [tk[i] for i in valid_indices] # 使用列表推导式获取正确的行程时间 all_permutations = list(itertools.permutations(route_numbers)) min_ft = float('inf') best_sequence = None for seq in all_permutations: current_btw = btw.copy() total_wait = 0 for route in seq: vid = np.argmin(current_btw) start_time = current_btw[vid].item() # 获取当前路径的行程时间 route_idx = route - 1 route_travel_time = tk[route_idx] current_btw[vid] += route_travel_time total_wait += np.sum(Zjk[:, route_idx]) * (start_time - t) idle_time = np.sum(np.maximum(t + headway - current_btw, 0)) ft = total_wait + idle_time if ft < min_ft: min_ft = ft best_sequence = seq if best_sequence is not None: priority_dict = {route: idx + 1 for idx, route in enumerate(best_sequence)} S[:, 3] = np.vectorize(priority_dict.get)(S[:, 0]) return S[S[:, 3].argsort()] else: return S # ====================== # 修正后的车辆调度函数 # ====================== def vehicle_dispatch(btw, t_jk, S, U, Zjk, t, headway, total_trip, total_traveltime, total_waitingtime, totoal_ridingtime, btw_vehicle, chengke): """执行车辆调度""" U = U.copy() btw = btw.copy() for i in range(len(btw)): if btw[i] < t: vr = btw_vehicle[i] if len(vr['record']) == 0: vr['record'].append([t, t, 0, 0]) else: last_end = vr['record'][-1][1] vr['record'].append([last_end, t, 0, 0]) btw[i] = t for current_time in range(t, t + headway + 1): available = [i for i, bt in enumerate(btw) if current_time > bt] sorted_available = sorted(available, key=lambda x: btw[x]) if sorted_available and np.sum(U) > 0: for bus_idx in sorted_available: if np.sum(U) <= 0: break if S.size == 0: break route_info = S[0] total_trip[0] += 1 route_idx = route_info[0] - 1 # 路径索引 route_travel_time = route_info[2] # 路径行程时间 total_traveltime[0] += route_travel_time served_pax = Zjk[:, route_idx] totoal_ridingtime[0] += np.sum(served_pax * t_jk[:, route_idx]) waiting_time = btw[bus_idx] - t total_waitingtime[0] += np.sum(served_pax) * waiting_time # 更新乘客信息 for j in range(len(served_pax)): if served_pax[j] > 0: stop = j + 1 # 站点编号 pax_mask = (chengke[:, 2] == stop) & (chengke[:, 9] == 0) pax_candidates = np.where(pax_mask)[0] if len(pax_candidates) > 0: num_pax = min(served_pax[j], len(pax_candidates)) selected = pax_candidates[:num_pax] chengke[selected, 9] = 1 # 标记为已服务 chengke[selected, 4] = btw[bus_idx] # 上车时间 chengke[selected, 5] = chengke[selected, 4] - chengke[selected, 3] # 等待时间 chengke[selected, 6] = t_jk[j, route_idx] # 乘车时间 chengke[selected, 7] = route_info[0] # 路径ID chengke[selected, 8] = bus_idx + 1 # 车辆ID # 更新车辆记录 vr = btw_vehicle[bus_idx] if not vr['record']: vr['record'].append([ btw[bus_idx], btw[bus_idx] + route_travel_time, route_info[0], route_travel_time ]) else: last_end = vr['record'][-1][1] vr['record'].append([ last_end, last_end + route_travel_time, route_info[0], route_travel_time ]) # 更新车辆可用时间和需求 btw[bus_idx] += route_travel_time U = U - Zjk[:, route_idx] S = np.delete(S, 0, axis=0) # 移除已分配路径 if np.sum(U) <= 0: break # 处理未服务的乘客 if current_time == t + headway and np.sum(U) > 0: total_waitingtime[0] += np.sum(U) * headway return (btw, S, U, total_trip, total_traveltime, total_waitingtime, totoal_ridingtime, btw_vehicle, chengke) def lastmile_model(phi_jk, tk, t_jk, U, beta1, beta2, beta3, K, J, c): """构建并求解混合整数规划模型""" print("构建并求解MIP模型...") try: model = gp.Model("LastMile") model.Params.OutputFlag = 0 model.Params.TimeLimit = 30 # 设置30秒时间限制 wk = model.addVars(K, vtype=GRB.INTEGER, name="wk") g = model.addVar(vtype=GRB.INTEGER, name="g") zjk = model.addVars(J, K, vtype=GRB.INTEGER, name="zjk") obj = beta1 * g obj += beta2 * gp.quicksum(tk[k] * wk[k] for k in range(K)) obj += beta3 * gp.quicksum(t_jk[j][k] * zjk[j, k] for j in range(J) for k in range(K)) model.setObjective(obj, GRB.MINIMIZE) # 约束1: 所有需求必须被满足 for j in range(J): model.addConstr( gp.quicksum(zjk[j, k] * phi_jk[j][k] for k in range(K)) == U[j], name=f"constr1_j{j}" ) # 约束2: 车辆容量约束 for k in range(K): model.addConstr( gp.quicksum(zjk[j, k] * phi_jk[j][k] for j in range(J)) <= c * wk[k], name=f"constr2_k{k}" ) # 约束3: 总行程数 model.addConstr( gp.quicksum(wk[k] for k in range(K)) == g, name="constr3_total_trips" ) # 约束4: 非负约束 model.addConstr(g >= 1, name="constr4_g_min") for k in range(K): model.addConstr(wk[k] >= 0, name=f"constr4_wk{k}_min") for j in range(J): for k in range(K): model.addConstr(zjk[j, k] >= 0, name=f"constr4_zjk{j}{k}_min") model.optimize() if model.status == GRB.OPTIMAL: Zjk = np.zeros((J, K), dtype=int) Wk = np.zeros(K, dtype=int) for j in range(J): for k in range(K): Zjk[j][k] = round(zjk[j, k].X) for k in range(K): Wk[k] = round(wk[k].X) G = round(g.X) return Zjk, Wk, G else: # 如果未找到最优解,使用启发式方法生成可行解 print("未找到最优解,使用启发式方法生成可行解...") return heuristic_solution(phi_jk, U, c, K, J) except gp.GurobiError as e: print(f"Gurobi错误: {e}") return heuristic_solution(phi_jk, U, c, K, J) def heuristic_solution(phi_jk, U, c, K, J): """启发式方法生成可行解""" print("使用启发式方法生成可行解...") Zjk = np.zeros((J, K), dtype=int) Wk = np.zeros(K, dtype=int) # 简单启发式:为每个站点分配车辆 remaining_demand = U.copy() k = 0 while np.sum(remaining_demand) > 0 and k < K: # 尝试覆盖尽可能多的站点 coverage = np.zeros(J, dtype=int) for j in range(J): if phi_jk[j][k] == 1 and remaining_demand[j] > 0: coverage[j] = 1 if np.sum(coverage) > 0: # 分配车辆 Wk[k] = 1 # 分配乘客 for j in range(J): if coverage[j] == 1: assign = min(remaining_demand[j], c) Zjk[j][k] = assign remaining_demand[j] -= assign k += 1 else: k += 1 G = np.sum(Wk) return Zjk, Wk, G # ====================== # 数据加载与预处理 # ====================== print("加载乘客分布数据...") passenger_distributionUN = io.loadmat('F:/LM程序/passenger_distribution_16UN.mat')['passenger_distributionUN'] passenger_distributionSH = io.loadmat('F:/LM程序/passenger_distribution_16SH.mat')['passenger_distributionSH'] passenger_distributionEH = io.loadmat('F:/LM程序/passenger_distribution_16EH.mat')['passenger_distributionEH'] ui = passenger_distributionEH # 选择分布类型 chengke = [] # 初始化乘客列表 for i in range(1, batch_size + 1): passenger_count_in_batch = 1 for j in range(1, J + 1): passenger_num = ui[i - 1, j - 1].item() if passenger_num > 0: for _ in range(int(passenger_num)): arrival_time = t + (i - 1) * headway passenger_record = [ i, # 批次编号 passenger_count_in_batch, # 批次内序号 j, # 下车站点 arrival_time, # 到达时间 *[0] * 6 # 初始化后6个字段 ] chengke.append(passenger_record) passenger_count_in_batch += 1 # ====================== # 候选路径生成 # ====================== candidate_route = candidate_route_gen( max_stop=3, max_traveltime=14, traveltime_matrix=traveltime_matrix, service_time=service_time, J=J ) K = len(candidate_route) phi_jk, tk, t_jk = parameter_gen(candidate_route, J, M, service_time, traveltime_matrix) # ====================== # 初始化记录变量 # ====================== total_trip = [0] total_traveltime = [0] total_waitingtime = [0] totoal_ridingtime = [0] chengke = np.array(chengke) t_jk = np.array(t_jk) btw = np.array(btw) tk = np.array(tk) btw_record = np.zeros((len(btw), batch_size + 1)) s = [{'route': None} for _ in range(batch_size + 100)] # 确保s是字典列表 pax_asg = [{'record': None} for _ in range(batch_size + 100)] # 确保pax_asg是字典列表 # ====================== # 主仿真循环 # ====================== print("开始主仿真循环...") for i in range(batch_size): print(f"\n处理批次 {i + 1}/{batch_size}...") if i == 0: U = ui[0, :].copy() else: U += ui[i, :] print(f"当前需求: {U}") # 求解模型 Zjk, Wk, G = lastmile_model(phi_jk, tk, t_jk, U, beta1, beta2, beta3, K, J, c) print(f"模型求解完成: 总行程数={G}, 路径分配={Wk}") # 路径排序 S = route_rank2(Wk, tk, Zjk, phi_jk, btw, t, headway) print(f"路径排序完成: 分配{len(S)}条路径") Temp_S = S.copy() if S.size > 0 else np.array([]) # 车辆调度 (btw, S, U, total_trip, total_traveltime, total_waitingtime, totoal_ridingtime, btw_vehicle, chengke) = vehicle_dispatch( btw, t_jk, S, U, Zjk, t, headway, total_trip, total_traveltime, total_waitingtime, totoal_ridingtime, btw_vehicle, chengke ) # 保存结果 if Temp_S.size > 0: s[i]["route"] = Temp_S pax_asg[i]['record'] = Zjk else: s[i]["route"] = np.array([]) pax_asg[i]['record'] = np.zeros((J, K)) # 更新时间和车辆状态 t += headway btw_record[:, i + 1] = btw.squeeze() print(f"批次完成, 剩余需求: {np.sum(U)}") # 处理剩余需求 print("\n处理剩余需求...") plus_trip = batch_size while np.sum(U) > 0 and plus_trip < batch_size + 10: # 添加安全限制 plus_trip += 1 print(f"额外批次 {plus_trip - batch_size}, 剩余需求: {np.sum(U)}") # 求解模型 Zjk, Wk, G = lastmile_model(phi_jk, tk, t_jk, U, beta1, beta2, beta3, K, J, c) print(f"模型求解完成: 总行程数={G}, 路径分配={Wk}") # 路径排序 S = route_rank2(Wk, tk, Zjk, phi_jk, btw, t, headway) print(f"路径排序完成: 分配{len(S)}条路径") Temp_S = S.copy() if S.size > 0 else np.array([]) # 车辆调度 (btw, S, U, total_trip, total_traveltime, total_waitingtime, totoal_ridingtime, btw_vehicle, chengke) = vehicle_dispatch( btw, t_jk, S, U, Zjk, t, headway, total_trip, total_traveltime, total_waitingtime, totoal_ridingtime, btw_vehicle, chengke ) # 保存结果 if Temp_S.size > 0: s[plus_trip] = {"route": Temp_S} pax_asg[plus_trip] = {'record': Zjk} else: s[plus_trip] = {"route": np.array([])} pax_asg[plus_trip] = {'record': np.zeros((J, K))} # 更新时间 t += headway print(f"\n额外的运行周期:{plus_trip - batch_size}") total_pax = np.sum(ui) print(f'总的乘客数量为:{total_pax}') print(f'总的行程数量为:{total_trip[0]}') print(f'总的服务时间为:{total_traveltime[0]}') print(f'乘客总的乘车时间为:{totoal_ridingtime[0]}') print(f'乘客总的等待时间为:{total_waitingtime[0]}') if total_pax > 0: avg_riding = totoal_ridingtime[0] / total_pax avg_waiting = total_waitingtime[0] / total_pax print(f'乘客总的平均乘车时间为:{avg_riding:.2f}') print(f'乘客总的平均等待时间为:{avg_waiting:.2f}') else: print('乘客总数为零,无法计算平均值') # ====================== # 禁忌搜索优化器 # ====================== class TabuSearchOptimizer: def __init__(self, initial_solution, candidate_routes, travel_time_matrix, passenger_data, vehicle_capacity, headway, num_vehicles, max_iter=50, max_no_improve=10, tabu_tenure=7): """ 初始化禁忌搜索优化器 """ self.initial_solution = initial_solution self.candidate_routes = candidate_routes self.travel_time_matrix = travel_time_matrix self.passenger_data = passenger_data self.vehicle_capacity = vehicle_capacity self.headway = headway self.num_vehicles = num_vehicles self.max_iter = max_iter self.max_no_improve = max_no_improve self.tabu_tenure = tabu_tenure # 初始化数据结构 self.best_solution = self.initialize_solution(initial_solution) self.best_objective = self.evaluate_solution(self.best_solution) self.current_solution = copy.deepcopy(self.best_solution) self.current_objective = self.best_objective self.tabu_list = deque(maxlen=tabu_tenure) self.objective_history = [self.best_objective] self.improvement_history = [] def initialize_solution(self, solution): """确保解决方案使用列表而不是numpy数组""" initialized = [] for interval in solution: # 转换route为列表 if 'route' in interval and isinstance(interval['route'], np.ndarray): # 将numpy数组转换为列表 if interval['route'].size > 0: interval['route'] = interval['route'].tolist() else: interval['route'] = [] initialized.append(interval) return initialized def evaluate_solution(self, solution): """ 评估解决方案的目标函数值(总等待时间+乘车时间) """ total_waiting = 0 total_riding = 0 vehicle_available = np.zeros(self.num_vehicles) unserved_passengers = [] # 预处理乘客数据为结构化数组 passenger_array = np.array(self.passenger_data, dtype=object) # 处理每个时间间隔 for i, interval in enumerate(solution): interval_start = i * self.headway # 添加当前间隔到达的乘客 batch_mask = (passenger_array[:, 0] == i + 1) if np.any(batch_mask): batch_passengers = passenger_array[batch_mask].copy() batch_passengers = np.column_stack((batch_passengers, np.full(batch_passengers.shape[0], interval_start))) unserved_passengers.extend(batch_passengers.tolist()) # 处理当前间隔的路径 if 'route' in interval and interval['route']: routes = interval['route'] # 按优先级排序 sorted_routes = sorted(routes, key=lambda x: x[3] if len(x) > 3 else 0) for route in sorted_routes: route_idx = route[0] - 1 route_info = self.candidate_routes[route_idx] # 选择最早可用的车辆 vehicle_idx = np.argmin(vehicle_available) start_time = max(vehicle_available[vehicle_idx], interval_start) # 服务乘客 capacity_used = 0 passengers_to_remove = [] route_stops = set(route_info['route'][1:-1]) # 筛选符合条件的乘客 eligible_passengers = [] for idx, p in enumerate(unserved_passengers): if p[2] in route_stops: eligible_passengers.append((idx, p)) # 按到达时间排序 eligible_passengers.sort(key=lambda x: x[1][3]) # 服务乘客直到车辆满载 for idx, p in eligible_passengers: if capacity_used >= self.vehicle_capacity: break # 计算等待时间和乘车时间 waiting_time = start_time - p[3] from_node = 0 # 起点(地铁站) to_node = p[2] # 下车站点 riding_time = self.travel_time_matrix[from_node][to_node] total_waiting += waiting_time total_riding += riding_time capacity_used += 1 passengers_to_remove.append(idx) # 移除已服务乘客 for idx in sorted(passengers_to_remove, reverse=True): unserved_passengers.pop(idx) # 更新车辆可用时间 vehicle_available[vehicle_idx] = start_time + route_info['travel_time'] # 对未服务乘客的惩罚 last_time = len(solution) * self.headway for p in unserved_passengers: total_waiting += (last_time - p[3]) * 10 # 惩罚因子 return total_waiting + total_riding def generate_neighbors(self, solution, num_neighbors=10): """ 生成邻域解 """ neighbors = [] for _ in range(num_neighbors): neighbor = copy.deepcopy(solution) interval_idx = random.randint(0, len(solution) - 1) operation = random.choice(['replace', 'swap', 'add', 'remove']) # 替换操作 if operation == 'replace' and 'route' in neighbor[interval_idx] and neighbor[interval_idx]['route']: route_idx = random.randint(0, len(neighbor[interval_idx]['route']) - 1) new_route_idx = random.randint(0, len(self.candidate_routes) - 1) new_route = [ new_route_idx + 1, 1, self.candidate_routes[new_route_idx]['travel_time'], random.random() # 随机优先级 ] neighbor[interval_idx]['route'][route_idx] = new_route move = ('replace', interval_idx, route_idx, new_route_idx) neighbors.append((neighbor, move)) # 交换操作 elif operation == 'swap' and len(solution) > 1: interval_idx1 = random.randint(0, len(solution) - 1) interval_idx2 = random.randint(0, len(solution) - 1) if interval_idx1 != interval_idx2: if ('route' in neighbor[interval_idx1] and neighbor[interval_idx1]['route'] and 'route' in neighbor[interval_idx2] and neighbor[interval_idx2]['route']): route_idx1 = random.randint(0, len(neighbor[interval_idx1]['route']) - 1) route_idx2 = random.randint(0, len(neighbor[interval_idx2]['route']) - 1) # 交换路径 (neighbor[interval_idx1]['route'][route_idx1], neighbor[interval_idx2]['route'][route_idx2]) = ( neighbor[interval_idx2]['route'][route_idx2], neighbor[interval_idx1]['route'][route_idx1] ) move = ('swap', interval_idx1, interval_idx2, route_idx1, route_idx2) neighbors.append((neighbor, move)) # 添加操作 elif operation == 'add': new_route_idx = random.randint(0, len(self.candidate_routes) - 1) new_route = [ new_route_idx + 1, 1, self.candidate_routes[new_route_idx]['travel_time'], random.random() # 随机优先级 ] if 'route' not in neighbor[interval_idx]: neighbor[interval_idx]['route'] = [new_route] elif neighbor[interval_idx]['route'] is None: neighbor[interval_idx]['route'] = [new_route] else: neighbor[interval_idx]['route'].append(new_route) move = ('add', interval_idx, new_route_idx) neighbors.append((neighbor, move)) # 删除操作 elif operation == 'remove' and 'route' in neighbor[interval_idx] and neighbor[interval_idx]['route']: route_idx = random.randint(0, len(neighbor[interval_idx]['route']) - 1) removed_route = neighbor[interval_idx]['route'].pop(route_idx) move = ('remove', interval_idx, removed_route[0]) neighbors.append((neighbor, move)) return neighbors def is_tabu(self, move): """检查移动是否在禁忌表中""" for tabu_move in self.tabu_list: if move == tabu_move: return True return False def optimize(self): """执行禁忌搜索优化""" no_improve_count = 0 start_time = time.time() print(f"开始禁忌搜索优化,初始目标值: {self.best_objective:.2f}") print(f"{'迭代':<5} | {'当前目标值':<12} | {'历史最优':<12} | {'改进量':<10} | {'耗时(s)':<8}") print("-" * 60) for iteration in range(self.max_iter): iter_start = time.time() neighbors = self.generate_neighbors(self.current_solution, num_neighbors=20) best_neighbor = None best_neighbor_obj = float('inf') best_move = None # 评估邻域解 for neighbor, move in neighbors: if self.is_tabu(move): continue neighbor_obj = self.evaluate_solution(neighbor) if neighbor_obj < best_neighbor_obj: best_neighbor = neighbor best_neighbor_obj = neighbor_obj best_move = move # 更新当前解 if best_neighbor is not None: self.current_solution = best_neighbor self.current_objective = best_neighbor_obj self.tabu_list.append(best_move) # 更新历史最优解 if best_neighbor_obj < self.best_objective: improvement = self.best_objective - best_neighbor_obj self.improvement_history.append(improvement) self.best_solution = copy.deepcopy(best_neighbor) self.best_objective = best_neighbor_obj no_improve_count = 0 # 打印改进信息 iter_time = time.time() - iter_start print(f"{iteration + 1:<5} | {best_neighbor_obj:<12.2f} | {self.best_objective:<12.2f} | " f"+{improvement:<10.2f} | {iter_time:<8.2f}") else: no_improve_count += 1 else: no_improve_count += 1 self.objective_history.append(self.current_objective) # 提前终止条件 if no_improve_count >= self.max_no_improve: print(f"\n提前终止:连续 {no_improve_count} 次迭代无改进") break total_time = time.time() - start_time print("\n优化完成!") print(f"总迭代次数: {iteration + 1}") print(f"总耗时: {total_time:.2f}秒") print(f"初始目标值: {self.objective_history[0]:.2f}") print(f"最终目标值: {self.best_objective:.2f}") improvement_percent = ((self.objective_history[0] - self.best_objective) / self.objective_history[0]) * 100 print(f"改进幅度: {self.objective_history[0] - self.best_objective:.2f} ({improvement_percent:.2f}%)") return self.best_solution, self.best_objective def plot_optimization_progress(self): """绘制优化过程图""" plt.figure(figsize=(12, 6)) # 目标函数值变化 plt.subplot(1, 2, 1) plt.plot(self.objective_history, 'b-', linewidth=2) plt.xlabel('迭代次数') plt.ylabel('目标函数值') plt.title('目标函数优化过程') plt.grid(True) # 改进历史 if self.improvement_history: plt.subplot(1, 2, 2) plt.plot(self.improvement_history, 'go-', linewidth=2) plt.xlabel('改进次数') plt.ylabel('改进量') plt.title('每次改进的优化量') plt.grid(True) plt.tight_layout() plt.savefig('optimization_progress.png', dpi=300) plt.show() # ====================== # 执行禁忌搜索优化 # ====================== print("\n准备禁忌搜索优化...") # 准备初始解数据 initial_solution = [] for i in range(min(batch_size + plus_trip, len(s)): # 确保不越界 interval_data = { 'route': s[i].get('route', None), 'pax_asg': pax_asg[i].get('record', None) if i < len(pax_asg) else None } initial_solution.append(interval_data) # 创建禁忌搜索优化器 ts_optimizer = TabuSearchOptimizer( initial_solution=initial_solution, candidate_routes=candidate_route, travel_time_matrix=traveltime_matrix, passenger_data=chengke.tolist(), vehicle_capacity=c, headway=headway, num_vehicles=m, max_iter=50, max_no_improve=10, tabu_tenure=7 ) # 执行优化 best_solution, best_objective = ts_optimizer.optimize() ts_optimizer.plot_optimization_progress() # 输出最优解 print("\n最优解结构:") for i, interval in enumerate(best_solution): print(f"间隔 {i + 1}:") if 'route' in interval and interval['route']: for j, route in enumerate(interval['route']): print(f" 路径 {j + 1}: ID={route[0]}, 服务时间={route[2]}, 优先级={route[3]}") else: print(" 无路径") print("\n优化完成!") Traceback (most recent call last): File "F:\PycharmProjects\PythonProject1\taboo3.py", line 755, in <module> best_solution, best_objective = ts_optimizer.optimize() ^^^^^^^^^^^^^^^^^^^^^^^ File "F:\PycharmProjects\PythonProject1\taboo3.py", line 640, in optimize neighbors = self.generate_neighbors(self.current_solution, num_neighbors=20) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "F:\PycharmProjects\PythonProject1\taboo3.py", line 615, in generate_neighbors removed_route = neighbor[interval_idx]['route'].pop(route_idx) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ AttributeError: 'numpy.ndarray' object has no attribute 'pop'. Did you mean: 'ptp'? 进程已结束,退出代码为 1

filetype

这个是我现在的代码,我应该怎么修改?我传入的本来就是灰度图,以.tiff结尾import os import re import glob import tensorflow as tf import numpy as np from tqdm import tqdm import matplotlib.pyplot as plt import matplotlib as mpl from sklearn.model_selection import train_test_split import imageio import sys from skimage.transform import resize from skimage.filters import gaussian, threshold_otsu from skimage.feature import canny from skimage.measure import regionprops, label import traceback from tensorflow.keras import layers, models from tensorflow.keras.optimizers import Adam from pathlib import Path from tensorflow.keras.losses import MeanSquaredError from tensorflow.keras.metrics import MeanAbsoluteError # =============== 配置参数===================================== BASE_DIR = "F:/2025.7.2wavelengthtiff" # 根目录路径 START_WAVELENGTH = 788.55500 # 起始波长 END_WAVELENGTH = 788.55600 # 结束波长 STEP = 0.00005 # 波长步长 BATCH_SIZE = 8 # 批处理大小 IMAGE_SIZE = (256, 256) # 图像尺寸 TEST_SIZE = 0.2 # 测试集比例 RANDOM_SEED = 42 # 随机种子 MODEL_SAVE_PATH = Path.home() / "Documents" / "wavelength_model.h5" # 修改为.h5格式以提高兼容性 # ================================================================ # 设置中文字体支持 try: mpl.rcParams['font.sans-serif'] = ['SimHei'] # 使用黑体 mpl.rcParams['axes.unicode_minus'] = False # 解决负号显示问题 print("已设置中文字体支持") except: print("警告:无法设置中文字体,图表可能无法正确显示中文") def generate_folder_names(start, end, step): """生成波长文件夹名称列表""" num_folders = int(((end - start) / step)) + 1 folder_names = [] for i in range(num_folders): wavelength = start + i * step folder_name = f"{wavelength:.5f}" folder_names.append(folder_name) return folder_names def find_best_match_file(folder_path, target_wavelength): """在文件夹中找到波长最接近目标值的TIFF文件""" tiff_files = glob.glob(os.path.join(folder_path, "*.tiff")) + glob.glob(os.path.join(folder_path, "*.tif")) if not tiff_files: return None best_match = None min_diff = float('inf') for file_path in tiff_files: filename = os.path.basename(file_path) match = re.search(r'\s*([\d.]+)_', filename) if not match: continue try: file_wavelength = float(match.group(1)) diff = abs(file_wavelength - target_wavelength) if diff < min_diff: min_diff = diff best_match = file_path except ValueError: continue return best_match def extract_shape_features(binary_image): """提取形状特征:面积、周长、圆度""" labeled = label(binary_image) regions = regionprops(labeled) if not regions: # 如果无轮廓,返回零特征 return np.zeros(3) features = [] for region in regions: features.append([ region.area, # 面积 region.perimeter, # 周长 4 * np.pi * (region.area / (region.perimeter ** 2)) if region.perimeter > 0 else 0 # 圆度 ]) features = np.array(features).mean(axis=0) # 取平均值 return features def load_and_preprocess_image(file_path): """加载并预处理TIFF图像 - 针对光场强度分布图优化""" try: # 使用imageio读取图像 image = imageio.imread(file_path, as_gray=True) # 转换为浮点数并归一化 image = image.astype(np.float32) / 255.0 # 图像尺寸调整 image = resize(image, (IMAGE_SIZE[0], IMAGE_SIZE[1]), anti_aliasing=True) # 增强光点特征 - 应用高斯模糊和阈值处理 blurred = gaussian(image, sigma=1) thresh = threshold_otsu(blurred) binary = blurred > thresh * 0.8 # 降低阈值以保留更多光点信息 # 边缘检测 edges = canny(blurred, sigma=1) # 形状特征提取 shape_features = extract_shape_features(binary) # 组合原始图像、增强图像和边缘图像 processed = np.stack([image, binary, edges], axis=-1) return processed, shape_features except Exception as e: print(f"图像加载失败: {e}, 使用空白图像代替") return np.zeros((IMAGE_SIZE[0], IMAGE_SIZE[1], 3), dtype=np.float32), np.zeros(3, dtype=np.float32) def create_tiff_dataset(file_paths): """从文件路径列表创建TensorFlow数据集""" # 创建数据集 dataset = tf.data.Dataset.from_tensor_slices(file_paths) # 使用tf.py_function包装图像加载函数 def load_wrapper(file_path): file_path_str = file_path.numpy().decode('utf-8') image, features = load_and_preprocess_image(file_path_str) return image, features # 定义TensorFlow兼容的映射函数 def tf_load_wrapper(file_path): image, features = tf.py_function( func=load_wrapper, inp=[file_path], Tout=[tf.float32, tf.float32] ) # 明确设置输出形状 image.set_shape((IMAGE_SIZE[0], IMAGE_SIZE[1], 3)) # 三个通道 features.set_shape((3,)) # 形状特征 return image, features dataset = dataset.map( tf_load_wrapper, num_parallel_calls=tf.data.AUTOTUNE ) dataset = dataset.batch(BATCH_SIZE).prefetch(tf.data.AUTOTUNE) return dataset def load_and_prepare_data(): """加载所有数据并准备训练/测试集""" # 生成所有文件夹名称 folder_names = generate_folder_names(START_WAVELENGTH, END_WAVELENGTH, STEP) print(f"\n生成的文件夹数量: {len(folder_names)}") print(f"起始文件夹: {folder_names[0]}") print(f"结束文件夹: {folder_names[-1]}") # 收集所有有效文件路径 valid_files = [] wavelengths = [] print("\n扫描文件夹并匹配文件...") for folder_name in tqdm(folder_names, desc="处理文件夹"): folder_path = os.path.join(BASE_DIR, folder_name) if not os.path.isdir(folder_path): continue try: target_wavelength = float(folder_name) file_path = find_best_match_file(folder_path, target_wavelength) if file_path: valid_files.append(file_path) wavelengths.append(target_wavelength) except ValueError: continue print(f"\n找到的有效文件: {len(valid_files)}/{len(folder_names)}") if not valid_files: raise ValueError("未找到任何有效文件,请检查路径和文件夹名称") # 转换为NumPy数组 wavelengths = np.array(wavelengths) # 归一化波长标签 min_wavelength = np.min(wavelengths) max_wavelength = np.max(wavelengths) wavelength_range = max_wavelength - min_wavelength wavelengths_normalized = (wavelengths - min_wavelength) / wavelength_range print(f"波长范围: {min_wavelength:.6f} 到 {max_wavelength:.6f}, 范围大小: {wavelength_range:.6f}") # 分割训练集和测试集 train_files, test_files, train_wavelengths, test_wavelengths = train_test_split( valid_files, wavelengths_normalized, test_size=TEST_SIZE, random_state=RANDOM_SEED ) print(f"训练集大小: {len(train_files)}") print(f"测试集大小: {len(test_files)}") # 创建数据集 train_dataset = create_tiff_dataset(train_files) test_dataset = create_tiff_dataset(test_files) # 创建波长标签数据集 train_labels = tf.data.Dataset.from_tensor_slices(train_wavelengths) test_labels = tf.data.Dataset.from_tensor_slices(test_wavelengths) # 合并图像和标签 train_dataset = tf.data.Dataset.zip((train_dataset, train_labels)) test_dataset = tf.data.Dataset.zip((test_dataset, test_labels)) return train_dataset, test_dataset, valid_files, min_wavelength, wavelength_range def build_spot_detection_model(input_shape, feature_shape): """构建针对光点图像的专用模型""" inputs = tf.keras.Input(shape=input_shape, name='input_image') features_input = tf.keras.Input(shape=feature_shape, name='input_features') # 使用Lambda层替代切片操作 channel1 = layers.Lambda(lambda x: x[..., 0:1])(inputs) channel2 = layers.Lambda(lambda x: x[..., 1:2])(inputs) channel3 = layers.Lambda(lambda x: x[..., 2:3])(inputs) # 通道1: 原始图像处理 x1 = layers.Conv2D(32, (3, 3), activation='relu', padding='same')(channel1) x1 = layers.BatchNormalization()(x1) x1 = layers.MaxPooling2D((2, 2))(x1) # 通道2: 二值化图像处理 x2 = layers.Conv2D(32, (3, 3), activation='relu', padding='same')(channel2) x2 = layers.BatchNormalization()(x2) x2 = layers.MaxPooling2D((2, 2))(x2) # 通道3: 边缘图像处理 x3 = layers.Conv2D(32, (3, 3), activation='relu', padding='same')(channel3) x3 = layers.BatchNormalization()(x3) x3 = layers.MaxPooling2D((2, 2))(x3) # 合并三个通道 x = layers.concatenate([x1, x2, x3]) # 特征提取 x = layers.Conv2D(64, (3, 3), activation='relu', padding='same')(x) x = layers.BatchNormalization()(x) x = layers.MaxPooling2D((2, 2))(x) x = layers.Conv2D(128, (3, 3), activation='relu', padding='same')(x) x = layers.BatchNormalization()(x) x = layers.MaxPooling2D((2, 2))(x) x = layers.Conv2D(256, (3, 3), activation='relu', padding='same')(x) x = layers.BatchNormalization()(x) x = layers.GlobalAveragePooling2D()(x) # 形状特征处理 features_x = layers.Dense(64, activation='relu')(features_input) features_x = layers.Dropout(0.5)(features_x) # 合并图像特征和形状特征 x = layers.Concatenate()([x, features_x]) # 回归头 x = layers.Dense(512, activation='relu')(x) x = layers.Dropout(0.5)(x) x = layers.Dense(256, activation='relu')(x) x = layers.Dropout(0.3)(x) outputs = layers.Dense(1, activation='sigmoid')(x) model = tf.keras.Model(inputs=[inputs, features_input], outputs=outputs) optimizer = Adam(learning_rate=0.0001) model.compile( optimizer=optimizer, loss='mean_squared_error', # 使用字符串 metrics=['mae'] # 使用字符串 ) return model def train_and_evaluate_model(train_dataset, test_dataset, input_shape, feature_shape, wavelength_range): """训练和评估模型""" model = build_spot_detection_model(input_shape, feature_shape) model.summary() # 回调函数 callbacks = [ tf.keras.callbacks.EarlyStopping( patience=20, restore_best_weights=True, monitor='val_loss', min_delta=1e-6 ), tf.keras.callbacks.ModelCheckpoint( str(MODEL_SAVE_PATH), # 注意确保是 str 类型 save_best_only=True, monitor='val_loss' ), tf.keras.callbacks.ReduceLROnPlateau( monitor='val_loss', factor=0.5, patience=5, min_lr=1e-7 ) ] # 训练模型 history = model.fit( train_dataset, epochs=200, # 增加训练轮数 validation_data=test_dataset, callbacks=callbacks, verbose=2 ) # 评估模型 print("\n评估测试集性能...") test_loss, test_mae_normalized = model.evaluate(test_dataset, verbose=0) # 将MAE转换回原始波长单位 test_mae = test_mae_normalized * wavelength_range print(f"测试集MAE (归一化值): {test_mae_normalized:.6f}") print(f"测试集MAE (原始波长单位): {test_mae:.8f} 纳米") # 绘制训练历史 plt.figure(figsize=(12, 6)) plt.subplot(1, 2, 1) plt.plot(history.history['loss'], label='训练损失') plt.plot(history.history['val_loss'], label='验证损失') plt.title('损失变化') plt.xlabel('Epoch') plt.ylabel('损失') plt.legend() plt.subplot(1, 2, 2) # 修改这里:使用正确的键名 plt.plot(history.history['mae'], label='训练MAE') plt.plot(history.history['val_mae'], label='验证MAE') plt.title('MAE变化') plt.xlabel('Epoch') plt.ylabel('MAE') plt.legend() plt.tight_layout() plt.savefig('f:/phD/代码/training_history.png') print("训练历史图已保存为 'training_history.png'") # 显式保存最终模型(已移除 save_format 参数) model.save(MODEL_SAVE_PATH) return model def predict_test_image(model, test_image_path, min_wavelength, wavelength_range): """预测单个测试图片的波长""" # 加载并预处理图像 image, features = load_and_preprocess_image(test_image_path) # 添加批次维度 image = np.expand_dims(image, axis=0) features = np.expand_dims(features, axis=0) # 预测 predicted_normalized = model.predict([image, features], verbose=0)[0][0] # 反归一化 predicted_wavelength = predicted_normalized * wavelength_range + min_wavelength # 显示结果 plt.figure(figsize=(12, 6)) plt.subplot(1, 2, 1) plt.imshow(image[0, :, :, 0], cmap='gray') # 原始图像通道 plt.title(f"原始光场强度分布") plt.axis('off') plt.subplot(1, 2, 2) plt.imshow(image[0, :, :, 1], cmap='gray') # 增强图像通道 plt.title(f"增强光点特征") plt.axis('off') plt.suptitle(f"预测波长: {predicted_wavelength:.6f} 纳米", fontsize=16) # 保存结果 result_path = "f:/phD/代码/prediction_result.png" plt.savefig(result_path) print(f"\n预测结果已保存为 '{result_path}'") return predicted_wavelength def validate_data_loading(file_paths, num_samples=3): """验证数据加载是否正确 - 针对光点图像优化""" print("\n验证数据加载...") plt.figure(figsize=(15, 10)) for i in range(min(num_samples, len(file_paths))): file_path = file_paths[i] image, features = load_and_preprocess_image(file_path) # 原始图像 plt.subplot(num_samples, 3, i*3+1) plt.imshow(image[..., 0], cmap='gray') plt.title(f"原始图像 {i+1}") plt.axis('off') # 增强图像 plt.subplot(num_samples, 3, i*3+2) plt.imshow(image[..., 1], cmap='gray') plt.title(f"增强光点特征 {i+1}") plt.axis('off') # 边缘图像 plt.subplot(num_samples, 3, i*3+3) plt.imshow(image[..., 2], cmap='gray') plt.title(f"边缘检测 {i+1}") plt.axis('off') print(f"图像 {i+1}: {file_path}") print(f"形状: {image.shape}, 原始值范围: {np.min(image[...,0]):.2f}-{np.max(image[...,0]):.2f}") print(f"增强值范围: {np.min(image[...,1]):.2f}-{np.max(image[...,1]):.2f}") plt.tight_layout() plt.savefig('f:/phD/代码/data_validation.png') print("数据验证图已保存为 'data_validation.png'") def main(): """主函数""" print(f"TensorFlow 版本: {tf.__version__}") # 1. 加载数据 try: train_dataset, test_dataset, all_files, min_wavelength, wavelength_range = load_and_prepare_data() print(f"最小波长: {min_wavelength:.6f}, 波长范围: {wavelength_range:.6f}") except Exception as e: print(f"数据加载失败: {str(e)}") return # 验证数据加载 validate_data_loading(all_files[:3]) # 获取输入形状和特征形状 try: for images, features in train_dataset.take(1): input_shape = images.shape[1:] feature_shape = features.shape[1:] print(f"输入形状: {input_shape}") print(f"特征形状: {feature_shape}") except Exception as e: print(f"获取输入形状失败: {str(e)}") input_shape = (IMAGE_SIZE[0], IMAGE_SIZE[1], 3) # 三个通道 feature_shape = (3,) # 形状特征 print(f"使用默认形状: {input_shape}, {feature_shape}") # 2. 训练模型 print("\n开始训练模型...") try: model = train_and_evaluate_model(train_dataset, test_dataset, input_shape, feature_shape, wavelength_range) except Exception as e: print(f"模型训练失败: {str(e)}") traceback.print_exc() return # 3. 测试模型 - 从测试集中随机选择一张图片 print("\n从测试集中随机选择一张图片进行预测...") try: # 获取整个测试集的一个批次 for test_images, test_features, test_labels in test_dataset.take(1): # 确保有样本可用 if test_images.shape[0] > 0: # 选择第一个样本 test_image = test_images[0].numpy() test_feature = test_features[0].numpy() # 安全提取第一个标签值 labels_np = test_labels.numpy() if labels_np.ndim == 0: # 标量情况 true_wavelength_normalized = labels_np.item() else: # 数组情况 true_wavelength_normalized = labels_np[0] # 反归一化真实值 true_wavelength = true_wavelength_normalized * wavelength_range + min_wavelength # 保存测试图片 test_image_path = "f:/phD/代码/test_image.tiff" imageio.imwrite(test_image_path, (test_image[..., 0] * 255).astype(np.uint8)) # 预测 predicted_wavelength = predict_test_image(model, test_image_path, min_wavelength, wavelength_range) print(f"真实波长: {true_wavelength:.6f} 纳米") print(f"预测波长: {predicted_wavelength:.6f} 纳米") print(f"绝对误差: {abs(predicted_wavelength-true_wavelength):.8f} 纳米") print(f"相对误差: {abs(predicted_wavelength-true_wavelength)/wavelength_range*100:.4f}%") else: print("错误:测试批次中没有样本") except Exception as e: print(f"测试失败: {str(e)}") traceback.print_exc() # 4. 用户自定义测试图片 print("\n您可以使用自己的图片进行测试:") # 加载模型 model = tf.keras.models.load_model(MODEL_SAVE_PATH) # 从之前的输出中获取这些值 #wavelength_range = ... # 请替换为实际值 # 提示用户输入图片路径 image_path = input("请输入您要测试的图片路径(例如:'test_image.tiff'):") # 进行预测 #predicted = predict_test_image(model, image_path, min_wavelength, wavelength_range) predicted = predict_test_image(model, image_path) print(f"预测波长: {predicted:.6f} 纳米") print("\n程序执行完成。") if __name__ == "__main__": # 设置TensorFlow日志级别 os.environ['TF_CPP_MIN_LOG_LEVEL'] = '2' # 确保必要的库已安装 try: import imageio from skimage.transform import resize from skimage.filters import gaussian, threshold_otsu from skimage.feature import canny from skimage.measure import regionprops, label except ImportError: print("安装必要的库...") import subprocess subprocess.run([sys.executable, "-m", "pip", "install", "imageio", "scikit-image"]) import imageio from skimage.transform import resize from skimage.filters import gaussian, threshold_otsu from skimage.feature import canny from skimage.measure import regionprops, label # 执行主函数 main()

filetype

import tkinter as tk from tkinter import ttk, filedialog, messagebox import pandas as pd import numpy as np import matplotlib as mpl import matplotlib.pyplot as plt from matplotlib.backends.backend_tkagg import FigureCanvasTkAgg import tensorflow as tf from tensorflow.keras.models import Model from tensorflow.keras.layers import Input, Dense, Lambda from tensorflow.keras.optimizers import Adam from sklearn.preprocessing import MinMaxScaler import os import time import warnings warnings.filterwarnings('ignore', category=UserWarning, module='tensorflow') mpl.rcParams['font.sans-serif'] = ['SimHei', 'Microsoft YaHei', 'Arial Unicode MS'] mpl.rcParams['axes.unicode_minus'] = False # 关键修复:使用 ASCII 减号 # 设置中文字体支持 plt.rcParams['font.sans-serif'] = ['SimHei'] plt.rcParams['axes.unicode_minus'] = False class PINNModel(tf.keras.Model): def __init__(self, num_layers=4, hidden_units=32, **kwargs): super(PINNModel, self).__init__(**kwargs) self.dense_layers = [Dense(hidden_units, activation='tanh') for _ in range(num_layers)] self.final_layer = Dense(1, activation='linear') # 添加带约束的物理参数 self.k_raw = tf.Variable(0.001, trainable=True, dtype=tf.float32, name='k_raw') self.k = tf.math.sigmoid(self.k_raw) * 0.5 # 约束在0-0.5之间 def call(self, inputs): t, h, dt = inputs # 添加特征交互项 interaction = tf.concat([t * dt, h * dt, t * h], axis=1) # 将时间、水位和时间步长作为输入特征 x = tf.concat([t, h, dt, interaction], axis=1) for layer in self.dense_layers: x = layer(x) return self.final_layer(x) def physics_loss(self, t, h_current, dt): """计算物理损失(基于离散渗流方程)""" # 预测下一时刻的水位 h_next_pred = self([t, h_current, dt]) # 改进的物理方程:指数衰减模型 # h_{t+1} = h_t * exp(-k * dt) residual = h_next_pred - (h_current * tf.exp(-self.k * dt)) return tf.reduce_mean(tf.square(residual)) class DamSeepageModel: def __init__(self, root): self.root = root self.root.title("大坝渗流预测模型(PINNs)") self.root.geometry("1200x800") # 初始化数据 self.train_df = None # 训练集 self.test_df = None # 测试集 self.model = None self.scaler = MinMaxScaler(feature_range=(0, 1)) self.evaluation_metrics = {} # 添加历史记录字典 self.history_records = {} self.current_history_key = None # 创建主界面 self.create_widgets() def create_widgets(self): # 创建主框架 main_frame = ttk.Frame(self.root, padding=10) main_frame.pack(fill=tk.BOTH, expand=True) # 左侧控制面板 control_frame = ttk.LabelFrame(main_frame, text="模型控制", padding=10) control_frame.pack(side=tk.LEFT, fill=tk.Y, padx=5, pady=5) # ====== 新增历史记录UI ====== # 在控制面板中添加历史记录部分 history_frame = ttk.LabelFrame(control_frame, text="历史训练记录", padding=10) history_frame.pack(fill=tk.X, pady=10) # 历史记录选择框 ttk.Label(history_frame, text="选择记录:").grid(row=0, column=0, sticky=tk.W, pady=5) self.history_var = tk.StringVar() self.history_combobox = ttk.Combobox( history_frame, textvariable=self.history_var, width=25, state='readonly' ) self.history_combobox.grid(row=0, column=1, padx=5) self.history_combobox.bind('<<ComboboxSelected>>', self.load_history_record) # 历史记录操作按钮 btn_frame = ttk.Frame(history_frame) btn_frame.grid(row=0, column=2, padx=5) ttk.Button(btn_frame, text="添加当前", command=self.save_current_as_history).pack(side=tk.LEFT, padx=2) ttk.Button(btn_frame, text="删除", command=self.delete_history_record).pack(side=tk.LEFT, padx=2) # 文件选择部分 file_frame = ttk.LabelFrame(control_frame, text="数据文件", padding=10) file_frame.pack(fill=tk.X, pady=5) # 训练集选择 ttk.Label(file_frame, text="训练集:").grid(row=0, column=0, sticky=tk.W, pady=5) self.train_file_var = tk.StringVar() ttk.Entry(file_frame, textvariable=self.train_file_var, width=30, state='readonly').grid(row=0, column=1, padx=5) ttk.Button(file_frame, text="选择文件", command=lambda: self.select_file("train")).grid(row=0, column=2) # 测试集选择 ttk.Label(file_frame, text="测试集:").grid(row=1, column=0, sticky=tk.W, pady=5) self.test_file_var = tk.StringVar() ttk.Entry(file_frame, textvariable=self.test_file_var, width=30, state='readonly').grid(row=1, column=1, padx=5) ttk.Button(file_frame, text="选择文件", command=lambda: self.select_file("test")).grid(row=1, column=2) # PINNs参数设置 param_frame = ttk.LabelFrame(control_frame, text="PINNs参数", padding=10) param_frame.pack(fill=tk.X, pady=10) # 验证集切分比例 ttk.Label(param_frame, text="验证集比例:").grid(row=0, column=0, sticky=tk.W, pady=5) self.split_ratio_var = tk.DoubleVar(value=0.2) ttk.Spinbox(param_frame, from_=0, to=1, increment=0.05, textvariable=self.split_ratio_var, width=10).grid(row=0, column=1, padx=5) # 隐藏层数量 ttk.Label(param_frame, text="网络层数:").grid(row=1, column=0, sticky=tk.W, pady=5) self.num_layers_var = tk.IntVar(value=4) ttk.Spinbox(param_frame, from_=2, to=8, increment=1, textvariable=self.num_layers_var, width=10).grid(row=1, column=1, padx=5) # 每层神经元数量 ttk.Label(param_frame, text="神经元数/层:").grid(row=2, column=0, sticky=tk.W, pady=5) self.hidden_units_var = tk.IntVar(value=32) ttk.Spinbox(param_frame, from_=16, to=128, increment=4, textvariable=self.hidden_units_var, width=10).grid(row=2, column=1, padx=5) # 训练轮次 ttk.Label(param_frame, text="训练轮次:").grid(row=3, column=0, sticky=tk.W, pady=5) self.epochs_var = tk.IntVar(value=500) ttk.Spinbox(param_frame, from_=100, to=2000, increment=100, textvariable=self.epochs_var, width=10).grid(row=3, column=1, padx=5) # 物理损失权重 ttk.Label(param_frame, text="物理损失权重:").grid(row=4, column=0, sticky=tk.W, pady=5) self.physics_weight_var = tk.DoubleVar(value=0.5) ttk.Spinbox(param_frame, from_=0.1, to=1.0, increment=0.1, textvariable=self.physics_weight_var, width=10).grid(row=4, column=1, padx=5) # 控制按钮 btn_frame = ttk.Frame(control_frame) btn_frame.pack(fill=tk.X, pady=10) ttk.Button(btn_frame, text="训练模型", command=self.train_model).pack(side=tk.LEFT, padx=5) ttk.Button(btn_frame, text="预测结果", command=self.predict).pack(side=tk.LEFT, padx=5) ttk.Button(btn_frame, text="保存结果", command=self.save_results).pack(side=tk.LEFT, padx=5) ttk.Button(btn_frame, text="重置", command=self.reset).pack(side=tk.RIGHT, padx=5) # 状态栏 self.status_var = tk.StringVar(value="就绪") status_bar = ttk.Label(control_frame, textvariable=self.status_var, relief=tk.SUNKEN, anchor=tk.W) status_bar.pack(fill=tk.X, side=tk.BOTTOM) # 右侧结果显示区域 result_frame = ttk.Frame(main_frame) result_frame.pack(side=tk.RIGHT, fill=tk.BOTH, expand=True, padx=5, pady=5) # 创建标签页 self.notebook = ttk.Notebook(result_frame) self.notebook.pack(fill=tk.BOTH, expand=True) # 损失曲线标签页 self.loss_frame = ttk.Frame(self.notebook) self.notebook.add(self.loss_frame, text="训练损失") # 预测结果标签页 self.prediction_frame = ttk.Frame(self.notebook) self.notebook.add(self.prediction_frame, text="预测结果") # 指标显示 self.metrics_var = tk.StringVar() metrics_label = ttk.Label( self.prediction_frame, textvariable=self.metrics_var, font=('TkDefaultFont', 10, 'bold'), relief='ridge', padding=5 ) metrics_label.pack(fill=tk.X, padx=5, pady=5) # 初始化绘图区域 self.fig, self.ax = plt.subplots(figsize=(10, 6)) self.canvas = FigureCanvasTkAgg(self.fig, master=self.prediction_frame) self.canvas.get_tk_widget().pack(fill=tk.BOTH, expand=True) # 损失曲线画布 self.loss_fig, self.loss_ax = plt.subplots(figsize=(10, 4)) self.loss_canvas = FigureCanvasTkAgg(self.loss_fig, master=self.loss_frame) self.loss_canvas.get_tk_widget().pack(fill=tk.BOTH, expand=True) def save_current_as_history(self): """将当前训练状态保存为历史记录""" if not hasattr(self, 'train_history') or not hasattr(self, 'predictions'): messagebox.showwarning("警告", "没有可保存的训练记录") return # 生成唯一键(时间戳) timestamp = time.strftime("%Y%m%d-%H%M%S") key = f"记录-{timestamp}" # 保存历史记录 self.history_records[key] = { 'train_df': self.train_df.copy(), 'test_df': self.test_df.copy(), 'train_history': self.train_history.copy(), 'predictions': self.predictions.copy(), 'actual_values': self.actual_values.copy(), 'test_time': self.test_time.copy(), 'evaluation_metrics': self.evaluation_metrics.copy(), 'scaler': self.scaler, 'model_params': { 'num_layers': self.num_layers_var.get(), 'hidden_units': self.hidden_units_var.get(), 'epochs': self.epochs_var.get(), 'physics_weight': self.physics_weight_var.get(), 'split_ratio': self.split_ratio_var.get() }, 'file_paths': { 'train': self.train_file_var.get(), 'test': self.test_file_var.get() } } # 更新下拉框 self.update_history_combobox() self.history_var.set(key) self.status_var.set(f"已保存当前训练为历史记录: {key}") def update_history_combobox(self): """更新历史记录下拉框选项""" records = list(self.history_records.keys()) self.history_combobox['values'] = records def load_history_record(self, event=None): """加载选中的历史记录""" key = self.history_var.get() if not key or key not in self.history_records: return record = self.history_records[key] self.current_history_key = key # 恢复数据集 self.train_df = record['train_df'].copy() self.test_df = record['test_df'].copy() # 恢复模型参数设置 params = record['model_params'] self.num_layers_var.set(params['num_layers']) self.hidden_units_var.set(params['hidden_units']) self.epochs_var.set(params['epochs']) self.physics_weight_var.set(params['physics_weight']) self.split_ratio_var.set(params['split_ratio']) # 恢复文件路径显示 files = record['file_paths'] self.train_file_var.set(files['train']) self.test_file_var.set(files['test']) # 恢复训练历史 self.train_history = record['train_history'].copy() # 恢复预测结果 self.predictions = record['predictions'].copy() self.actual_values = record['actual_values'].copy() self.test_time = record['test_time'].copy() self.evaluation_metrics = record['evaluation_metrics'].copy() # 更新状态 self.status_var.set(f"已加载历史记录: {key}") # 显示预测结果 self.show_prediction_results() # 显示损失曲线 self.show_loss_history() def show_prediction_results(self): """显示历史记录的预测结果""" if not hasattr(self, 'predictions'): return # 清除现有图表 self.ax.clear() # 绘制结果 self.ax.plot(self.test_time, self.actual_values, 'b-', label='真实值') self.ax.plot(self.test_time, self.predictions, 'r--', label='预测值') self.ax.set_title(f'大坝渗流水位预测结果(历史记录: {self.current_history_key})') self.ax.set_xlabel('时间') self.ax.set_ylabel('测压管水位', rotation=0) self.ax.legend() # 设置时间轴格式 import matplotlib.dates as mdates self.ax.xaxis.set_major_locator(mdates.YearLocator()) self.ax.xaxis.set_major_formatter(mdates.DateFormatter('%Y')) self.ax.xaxis.set_minor_locator(mdates.MonthLocator(interval=2)) self.ax.grid(which='minor', axis='x', linestyle='--', color='gray', alpha=0.3) self.ax.grid(which='major', axis='y', linestyle='-', color='lightgray', alpha=0.5) self.ax.tick_params(axis='x', which='major', rotation=0, labelsize=10) self.ax.tick_params(axis='x', which='minor', length=3) # 显示评估指标 metrics_text = ( f"MSE: {self.evaluation_metrics['MSE']:.4f} | " f"RMSE: {self.evaluation_metrics['RMSE']:.4f} | " f"MAE: {self.evaluation_metrics['MAE']:.4f} | " f"MAPE: {self.evaluation_metrics['MAPE']:.2f}% | " f"R²: {self.evaluation_metrics['R2']:.4f}" ) self.metrics_var.set(metrics_text) # 在图表上添加指标 self.ax.text( 0.5, 1.08, metrics_text, transform=self.ax.transAxes, ha='center', fontsize=10, bbox=dict(facecolor='white', alpha=0.8) ) # 调整布局 plt.tight_layout(pad=2.0) self.canvas.draw() def show_loss_history(self): """显示历史记录的损失曲线""" if not hasattr(self, 'train_history') or 'train_data_loss' not in self.train_history: return # 修复:清除现有图表 self.loss_ax.clear() # 修正此行 # 绘制损失曲线 epochs_range = range(1, len(self.train_history['train_data_loss']) + 1) self.loss_ax.plot(epochs_range, self.train_history['train_data_loss'], 'b-', label='训练数据损失') if 'physics_loss' in self.train_history: self.loss_ax.plot(epochs_range, self.train_history['physics_loss'], 'r--', label='物理损失') if 'valid_data_loss' in self.train_history: self.loss_ax.plot(epochs_range, self.train_history['valid_data_loss'], 'g-.', label='验证数据损失') self.loss_ax.set_title(f'PINNs训练损失曲线(历史记录: {self.current_history_key})') self.loss_ax.set_xlabel('轮次') self.loss_ax.set_ylabel('损失', rotation=0) self.loss_ax.legend() self.loss_ax.grid(True, alpha=0.3) self.loss_ax.set_yscale('log') self.loss_canvas.draw() def delete_history_record(self): """删除选中的历史记录""" key = self.history_var.get() if not key or key not in self.history_records: return # 确认删除 if not messagebox.askyesno("确认删除", f"确定要删除历史记录 '{key}' 吗?"): return # 删除记录 del self.history_records[key] # 更新下拉框 self.update_history_combobox() # 清空选择 self.history_var.set('') self.status_var.set(f"已删除历史记录: {key}") def select_file(self, file_type): """选择Excel文件并计算时间步长""" try: file_path = filedialog.askopenfilename( title=f"选择{file_type}集Excel文件", filetypes=[("Excel文件", "*.xlsx *.xls"), ("所有文件", "*.*")] ) if not file_path: return df = pd.read_excel(file_path) # 验证必需列是否存在 required_cols = ['year', 'month', 'day', '水位'] missing_cols = [col for col in required_cols if col not in df.columns] if missing_cols: messagebox.showerror("列名错误", f"缺少必需列: {', '.join(missing_cols)}") return # 时间特征处理 time_features = ['year', 'month', 'day'] missing_time_features = [feat for feat in time_features if feat not in df.columns] if missing_time_features: messagebox.showerror("列名错误", f"Excel文件缺少预处理后的时间特征列: {', '.join(missing_time_features)}") return # 创建时间戳列 (增强兼容性) time_cols = ['year', 'month', 'day'] if 'hour' in df.columns: time_cols.append('hour') if 'minute' in df.columns: time_cols.append('minute') if 'second' in df.columns: time_cols.append('second') # 填充缺失的时间单位 for col in ['hour', 'minute', 'second']: if col not in df.columns: df[col] = 0 df['datetime'] = pd.to_datetime(df[time_cols]) # 设置时间索引 df = df.set_index('datetime') # 计算相对时间(天) df['days'] = (df.index - df.index[0]).days # 新增:计算时间步长dt(单位:天) df['dt'] = df['days'].diff() # 处理时间步长异常值 if len(df) > 1: # 计算有效时间步长(排除<=0的值) valid_dt = df['dt'][df['dt'] > 0] if len(valid_dt) > 0: avg_dt = valid_dt.mean() else: avg_dt = 1.0 else: avg_dt = 1.0 # 替换非正值 df.loc[df['dt'] <= 0, 'dt'] = avg_dt # 填充缺失值 df['dt'] = df['dt'].fillna(avg_dt) # 保存数据 if file_type == "train": self.train_df = df self.train_file_var.set(os.path.basename(file_path)) self.status_var.set(f"已加载训练集: {len(self.train_df)}条数据") else: self.test_df = df self.test_file_var.set(os.path.basename(file_path)) self.status_var.set(f"已加载测试集: {len(self.test_df)}条数据") except Exception as e : error_msg = f"文件读取失败: {str(e)}\n\n请确保:\n1. 文件不是打开状态\n2. 文件格式正确\n3. 包含必需的时间和水位列" messagebox.showerror("文件错误", error_msg) def calculate_metrics(self, y_true, y_pred): """计算评估指标""" from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score mse = mean_squared_error(y_true, y_pred) rmse = np.sqrt(mse) mae = mean_absolute_error(y_true, y_pred) non_zero_idx = np.where(y_true != 0)[0] if len(non_zero_idx) > 0: mape = np.mean(np.abs((y_true[non_zero_idx] - y_pred[non_zero_idx]) / y_true[non_zero_idx])) * 100 else: mape = float('nan') r2 = r2_score(y_true, y_pred) return { 'MSE': mse, 'RMSE': rmse, 'MAE': mae, 'MAPE': mape, 'R2': r2 } def train_model(self): """训练PINNs模型(带早停机制+训练指标监控,无指标绘图)""" if self.train_df is None: messagebox.showwarning("警告", "请先选择训练集文件") return try: self.status_var.set("正在预处理数据...") self.root.update() # 从训练集中切分训练子集和验证子集(时间顺序切分) split_ratio = 1 - self.split_ratio_var.get() split_idx = int(len(self.train_df) * split_ratio) train_subset = self.train_df.iloc[:split_idx] valid_subset = self.train_df.iloc[split_idx:] # 检查数据量是否足够 if len(train_subset) < 2 or len(valid_subset) < 2: messagebox.showerror("数据错误", "训练集数据量不足(至少需要2个时间步)") return # 数据预处理(训练子集拟合scaler,验证子集用相同scaler) train_subset_scaled = self.scaler.fit_transform(train_subset[['水位']]) valid_subset_scaled = self.scaler.transform(valid_subset[['水位']]) # 准备训练数据(原始值用于指标计算) t_train = train_subset['days'].values[1:].reshape(-1, 1).astype(np.float32) h_train = train_subset_scaled[:-1].astype(np.float32) dt_train = train_subset['dt'].values[1:].reshape(-1, 1).astype(np.float32) # 时间步长 h_next_train_scaled = train_subset_scaled[1:].astype(np.float32) # 归一化后的标签 h_next_train_true = train_subset['水位'].values[1:].reshape(-1, 1) # 原始真实值(反归一化前) # 准备验证数据(原始值用于指标计算) t_valid = valid_subset['days'].values[1:].reshape(-1, 1).astype(np.float32) h_valid = valid_subset_scaled[:-1].astype(np.float32) dt_valid = valid_subset['dt'].values[1:].reshape(-1, 1).astype(np.float32) # 时间步长 h_next_valid_scaled = valid_subset_scaled[1:].astype(np.float32) # 归一化后的标签 h_next_valid_true = valid_subset['水位'].values[1:].reshape(-1, 1) # 原始真实值 # 创建模型和优化器 self.model = PINNModel( num_layers=self.num_layers_var.get(), hidden_units=self.hidden_units_var.get() ) optimizer = Adam(learning_rate=0.001) # 构建训练/验证数据集(现在包含时间步长dt) train_dataset = tf.data.Dataset.from_tensor_slices(((t_train, h_train, dt_train), h_next_train_scaled)) train_dataset = train_dataset.shuffle(buffer_size=1024).batch(32) valid_dataset = tf.data.Dataset.from_tensor_slices(((t_valid, h_valid, dt_valid), h_next_valid_scaled)) valid_dataset = valid_dataset.batch(32) # 验证集无需shuffle # 损失记录(新增指标记录) train_data_loss_history = [] physics_loss_history = [] valid_data_loss_history = [] # 新增:训练集和验证集的指标历史(MSE, RMSE等) train_metrics_history = [] # 每个元素是字典(如{'MSE':..., 'RMSE':...}) valid_metrics_history = [] # 早停机制参数 patience = int(self.epochs_var.get() / 3) min_delta = 1e-4 best_valid_loss = float('inf') wait = 0 best_epoch = 0 best_weights = None start_time = time.time() # 自定义训练循环(新增指标计算) for epoch in range(self.epochs_var.get()): # 训练阶段 epoch_train_data_loss = [] epoch_physics_loss = [] # 收集训练预测值(归一化后) train_pred_scaled = [] for step, ((t_batch, h_batch, dt_batch), h_next_batch) in enumerate(train_dataset): with tf.GradientTape() as tape: # 预测下一时刻水位 h_pred = self.model([t_batch, h_batch, dt_batch]) data_loss = tf.reduce_mean(tf.square(h_next_batch - h_pred)) # 动态调整物理损失权重 current_physics_weight = tf.minimum( self.physics_weight_var.get() * (1.0 + epoch / self.epochs_var.get()), 0.8 ) # 计算物理损失(传入时间步长dt) physics_loss = self.model.physics_loss(t_batch, h_batch, dt_batch) loss = data_loss + current_physics_weight * physics_loss grads = tape.gradient(loss, self.model.trainable_variables) optimizer.apply_gradients(zip(grads, self.model.trainable_variables)) epoch_train_data_loss.append(data_loss.numpy()) epoch_physics_loss.append(physics_loss.numpy()) train_pred_scaled.append(h_pred.numpy()) # 保存训练预测值(归一化) # 合并训练预测值(归一化后) train_pred_scaled = np.concatenate(train_pred_scaled, axis=0) # 反归一化得到原始预测值 train_pred_true = self.scaler.inverse_transform(train_pred_scaled) # 计算训练集指标(使用原始真实值和预测值) train_metrics = self.calculate_metrics( y_true=h_next_train_true.flatten(), y_pred=train_pred_true.flatten() ) train_metrics_history.append(train_metrics) # 验证阶段 epoch_valid_data_loss = [] valid_pred_scaled = [] for ((t_v_batch, h_v_batch, dt_v_batch), h_v_next_batch) in valid_dataset: h_v_pred = self.model([t_v_batch, h_v_batch, dt_v_batch]) valid_data_loss = tf.reduce_mean(tf.square(h_v_next_batch - h_v_pred)) epoch_valid_data_loss.append(valid_data_loss.numpy()) valid_pred_scaled.append(h_v_pred.numpy()) # 保存验证预测值(归一化) # 合并验证预测值(归一化后) valid_pred_scaled = np.concatenate(valid_pred_scaled, axis=0) # 反归一化得到原始预测值 valid_pred_true = self.scaler.inverse_transform(valid_pred_scaled) # 计算验证集指标(使用原始真实值和预测值) valid_metrics = self.calculate_metrics( y_true=h_next_valid_true.flatten(), y_pred=valid_pred_true.flatten() ) valid_metrics_history.append(valid_metrics) # 计算平均损失 avg_train_data_loss = np.mean(epoch_train_data_loss) avg_physics_loss = np.mean(epoch_physics_loss) avg_valid_data_loss = np.mean(epoch_valid_data_loss) # 记录损失 train_data_loss_history.append(avg_train_data_loss) physics_loss_history.append(avg_physics_loss) valid_data_loss_history.append(avg_valid_data_loss) # 早停机制逻辑(与原代码一致) current_valid_loss = avg_valid_data_loss if current_valid_loss < best_valid_loss - min_delta: best_valid_loss = current_valid_loss best_epoch = epoch + 1 wait = 0 best_weights = self.model.get_weights() else: wait += 1 if wait >= patience: self.status_var.set(f"触发早停!最佳轮次: {best_epoch},最佳验证损失: {best_valid_loss:.4f}") if best_weights is not None: self.model.set_weights(best_weights) break # 更新状态(新增指标显示) if epoch % 10 == 0: # 提取当前训练/验证的关键指标(如RMSE) train_rmse = train_metrics['RMSE'] valid_rmse = valid_metrics['RMSE'] train_r2 = train_metrics['R2'] valid_r2 = valid_metrics['R2'] k_value = self.model.k.numpy() elapsed = time.time() - start_time self.status_var.set( f"训练中 | 轮次: {epoch + 1}/{self.epochs_var.get()} | " f"训练RMSE: {train_rmse:.4f} | 验证RMSE: {valid_rmse:.4f} | " f"训练R²: {train_r2:.4f} | 验证R²: {valid_r2:.4f} | " f"k: {k_value:.6f} | 时间: {elapsed:.1f}秒 | 早停等待: {wait}/{patience}" ) self.root.update() # 绘制损失曲线(仅保留原始损失曲线) self.loss_ax.clear() epochs_range = range(1, len(train_data_loss_history) + 1) self.loss_ax.plot(epochs_range, train_data_loss_history, 'b-', label='训练数据损失') self.loss_ax.plot(epochs_range, physics_loss_history, 'r--', label='物理损失') self.loss_ax.plot(epochs_range, valid_data_loss_history, 'g-.', label='验证数据损失') self.loss_ax.set_title('PINNs训练与验证损失') self.loss_ax.set_xlabel('轮次') self.loss_ax.set_ylabel('损失', rotation=0) self.loss_ax.legend() self.loss_ax.grid(True, alpha=0.3) self.loss_ax.set_yscale('log') self.loss_canvas.draw() # 训练完成提示(保留指标总结) elapsed = time.time() - start_time if wait >= patience: completion_msg = ( f"早停触发 | 最佳轮次: {best_epoch} | 最佳验证损失: {best_valid_loss:.4f} | " f"最佳验证RMSE: {valid_metrics_history[best_epoch - 1]['RMSE']:.4f} | " f"总时间: {elapsed:.1f}秒" ) else: completion_msg = ( f"训练完成 | 总轮次: {self.epochs_var.get()} | " f"最终训练RMSE: {train_metrics_history[-1]['RMSE']:.4f} | " f"最终验证RMSE: {valid_metrics_history[-1]['RMSE']:.4f} | " f"最终训练R²: {train_metrics_history[-1]['R2']:.4f} | " f"最终验证R²: {valid_metrics_history[-1]['R2']:.4f} | " f"总时间: {elapsed:.1f}秒" ) # 在训练循环结束后,保存训练历史 self.train_history = { 'train_data_loss': train_data_loss_history, 'physics_loss': physics_loss_history, 'valid_data_loss': valid_data_loss_history, 'train_metrics': train_metrics_history, 'valid_metrics': valid_metrics_history } self.status_var.set(completion_msg) messagebox.showinfo("训练完成", f"PINNs模型训练成功完成!\n{completion_msg}") except Exception as e: messagebox.showerror("训练错误", f"模型训练失败:\n{str(e)}") self.status_var.set("训练失败") def predict(self): """使用PINNs模型进行递归预测(自回归预测)""" if self.model is None: messagebox.showwarning("警告", "请先训练模型") return if self.test_df is None: messagebox.showwarning("警告", "请先选择测试集文件") return try: self.status_var.set("正在生成预测...") self.root.update() # 预处理测试数据 test_scaled = self.scaler.transform(self.test_df[['水位']]) # 准备时间特征 t_test = self.test_df['days'].values.reshape(-1, 1).astype(np.float32) dt_test = self.test_df['dt'].values.reshape(-1, 1).astype(np.float32) # 时间步长 # 递归预测(自回归)带误差修正 n = len(t_test) # 初始化预测序列(归一化),第一个点使用真实值 predicted_scaled = np.zeros((n, 1), dtype=np.float32) predicted_scaled[0] = test_scaled[0] # 第一个点使用真实值 # 误差累积修正因子 error_correction_factor = 0.3 # 从第二个时间点开始预测 for i in range(1, n): # 使用上一个时间点的特征 t_prev = t_test[i - 1:i] # 上一个时间点的时间 h_prev = predicted_scaled[i - 1:i] # 上一个时间点的水位(预测值) dt_i = dt_test[i:i + 1] # 当前时间步长 # 预测当前时间点的水位 h_pred = self.model([t_prev, h_prev, dt_i]) # 误差修正:混合真实值和预测值 if i < n - 1 and i % 5 == 0: # 每5步校正一次 correction = test_scaled[i] - h_pred.numpy()[0][0] predicted_scaled[i] = h_pred.numpy()[0][0] + error_correction_factor * correction else: predicted_scaled[i] = h_pred.numpy()[0][0] # 反归一化 predictions = self.scaler.inverse_transform(predicted_scaled) actual_values = self.scaler.inverse_transform(test_scaled) # 创建时间索引 test_time = self.test_df.index # 清除现有图表 self.ax.clear() # 绘制结果 self.ax.plot(test_time, actual_values, 'b-', label='真实值') self.ax.plot(test_time, predictions, 'r--', label='预测值') self.ax.set_title('大坝渗流水位预测结果(PINNs)') self.ax.set_xlabel('时间') self.ax.set_ylabel('测压管水位', rotation=0) self.ax.legend() # 优化时间轴刻度 import matplotlib.dates as mdates self.ax.xaxis.set_major_locator(mdates.YearLocator()) self.ax.xaxis.set_major_formatter(mdates.DateFormatter('%Y')) self.ax.xaxis.set_minor_locator(mdates.MonthLocator(interval=2)) self.ax.grid(which='minor', axis='x', linestyle=':', color='gray', alpha=0.3) self.ax.grid(which='major', axis='y', linestyle='-', color='lightgray', alpha=0.5) self.ax.tick_params(axis='x', which='major', rotation=0, labelsize=9) self.ax.tick_params(axis='x', which='minor', length=2) # 计算评估指标(排除第一个点) eval_actual = actual_values[1:].flatten() eval_pred = predictions[1:].flatten() self.evaluation_metrics = self.calculate_metrics(eval_actual, eval_pred) metrics_text = ( f"MSE: {self.evaluation_metrics['MSE']:.4f} | " f"RMSE: {self.evaluation_metrics['RMSE']:.4f} | " f"MAE: {self.evaluation_metrics['MAE']:.4f} | " f"MAPE: {self.evaluation_metrics['MAPE']:.2f}% | " f"R²: {self.evaluation_metrics['R2']:.4f}" ) self.metrics_var.set(metrics_text) # 在图表上添加指标 self.ax.text( 0.5, 1.05, metrics_text, transform=self.ax.transAxes, ha='center', fontsize=9, bbox=dict(facecolor='white', alpha=0.8) ) # 调整布局 plt.tight_layout(pad=2.0) self.canvas.draw() # 保存预测结果 self.predictions = predictions self.actual_values = actual_values self.test_time = test_time self.status_var.set("预测完成,结果已显示") except Exception as e: messagebox.showerror("预测错误", f"预测失败:\n{str(e)}") self.status_var.set("预测失败") # 记录详细错误信息 import traceback traceback.print_exc() def save_results(self): """保存预测结果和训练历史数据""" if not hasattr(self, 'predictions') or not hasattr(self, 'train_history'): messagebox.showwarning("警告", "请先生成预测结果并完成训练") return # 选择保存路径 save_path = filedialog.asksaveasfilename( defaultextension=".xlsx", filetypes=[("Excel文件", "*.xlsx"), ("所有文件", "*.*")], title="保存结果" ) if not save_path: return try: # 1. 创建预测结果DataFrame result_df = pd.DataFrame({ '时间': self.test_time, '实际水位': self.actual_values.flatten(), '预测水位': self.predictions.flatten() }) # 2. 创建评估指标DataFrame metrics_df = pd.DataFrame([self.evaluation_metrics]) # 3. 创建训练历史DataFrame history_data = { '轮次': list(range(1, len(self.train_history['train_data_loss']) + 1)), '训练数据损失': self.train_history['train_data_loss'], '物理损失': self.train_history['physics_loss'], '验证数据损失': self.train_history['valid_data_loss'] } # 添加训练集指标 for metric in ['MSE', 'RMSE', 'MAE', 'MAPE', 'R2']: history_data[f'训练集_{metric}'] = [item[metric] for item in self.train_history['train_metrics']] # 添加验证集指标 for metric in ['MSE', 'RMSE', 'MAE', 'MAPE', 'R2']: history_data[f'验证集_{metric}'] = [item[metric] for item in self.train_history['valid_metrics']] history_df = pd.DataFrame(history_data) # 保存到Excel with pd.ExcelWriter(save_path) as writer: result_df.to_excel(writer, sheet_name='预测结果', index=False) metrics_df.to_excel(writer, sheet_name='评估指标', index=False) history_df.to_excel(writer, sheet_name='训练历史', index=False) # 保存图表 chart_path = os.path.splitext(save_path)[0] + "_chart.png" self.fig.savefig(chart_path, dpi=300) # 保存损失曲线图 loss_path = os.path.splitext(save_path)[0] + "_loss.png" self.loss_fig.savefig(loss_path, dpi=300) self.status_var.set(f"结果已保存至: {os.path.basename(save_path)}") messagebox.showinfo("保存成功", f"预测结果和图表已保存至:\n" f"主文件: {save_path}\n" f"预测图表: {chart_path}\n" f"损失曲线: {loss_path}") except Exception as e: messagebox.showerror("保存错误", f"保存结果失败:\n{str(e)}") def reset(self): """重置程序状态""" self.train_df = None self.test_df = None self.model = None self.train_file_var.set("") self.test_file_var.set("") # 清除训练历史 if hasattr(self, 'train_history'): del self.train_history # 清除图表 if hasattr(self, 'ax'): self.ax.clear() if hasattr(self, 'loss_ax'): self.loss_ax.clear() # 重绘画布 if hasattr(self, 'canvas'): self.canvas.draw() if hasattr(self, 'loss_canvas'): self.loss_canvas.draw() # 清除状态 self.status_var.set("已重置,请选择新数据") # 清除预测结果 if hasattr(self, 'predictions'): del self.predictions # 清除指标文本 if hasattr(self, 'metrics_var'): self.metrics_var.set("") messagebox.showinfo("重置", "程序已重置,可以开始新的分析") if __name__ == "__main__": root = tk.Tk() app = DamSeepageModel(root) root.mainloop() 帮我对t,h,dt等特征量进行归一化处理

weixin_38552536
  • 粉丝: 6
上传资源 快速赚钱