活动介绍

import io import tkinter as tk from tkinter import * from tkinter import messagebox from tkinter import font import pandas as pd from pandas import DataFrame import numpy as np import matplotlib.pyplot as plt clickable = 1 data = None row = 0 column = 0 changing_label = None changing_entry = None rroot = None addr = '' aa = 0 bb = 0 index = 0

时间: 2023-11-26 20:03:06 浏览: 192
这是一段 Python 代码,其中使用了 tkinter、pandas、numpy 和 matplotlib.pyplot 库。它定义了一些变量和函数,但没有具体的实现代码。其中 clickable 变量的作用不明确,data 变量可能是存储数据的容器,row 和 column 变量可能是指定数据的行列位置,changing_label 和 changing_entry 变量可能是用于动态改变标签和输入框的控件对象。rroot 变量和 addr 变量的作用不明确,aa 和 bb 变量的初始值为 0,index 变量的作用也不明确。
相关问题

import pandas as pd import numpy as np import tkinter as tk def dataframe(): filename=r'500强企业数据.csv' df=pd.read_csv(filename,encoding='ansi') data = list(df['估值(亿元)']) for i, val in enumerate(data): if '~' in val: start, end = val.split('~') avg = int((float(start) + float(end)) / 2) data = str(avg) df['估值(亿元)']=df['估值(亿元)'].replace('data','str(avg)') df1=df.dropna() first=df1['序号'] second=df1['企业名称'] third=df1['估值(亿元)'] forth=df1['所属国家'] fifth=df1['赛道'] newIndex=pd.concat([first,second,third,forth,fifth],axis=1) newIndex.columns=['序号','企业名称','估值(亿元)','所属国家','赛道'] newIndex.to_csv('处理后500强企业数据.csv',encoding='ansi',index=False) tk.messagebox.showinfo('保存文件','文件:处理后500强企业数据.csv保存成功!')

这段代码使用 pandas 和 tkinter 库读取并处理了一个名为 "500强企业数据.csv" 的数据文件,然后将处理后的结果保存到一个名为 "处理后500强企业数据.csv" 的新文件中。这个数据文件包含了一些企业的信息,包括序号、企业名称、估值、所属国家和赛道等。在处理过程中,代码首先将估值中的 "~" 符号分隔的数值转换成相应的平均值,然后删除了任何包含缺失数据的行,最后将处理后的结果保存到新文件中。保存成功后,程序会弹出一个提示窗口告诉你保存成功了。

import tkinter as tk #import numpy as np #from logging import root import pandas as pd from tkinter import messagebox import tushare as ts import matplotlib.pyplot as plt from matplotlib.backends.backend_tkagg import FigureCanvasTkAgg import mplfinance as mpf # 初始化Tushare pro_api pro = ts.pro_api('8dca39c0ade70178ef783dfb30f27c2428fcd88e3b90e652c3743091') # 请替换为你的Tushare token def get_and_plot_data(): ts_code = entry.get() if not ts_code: messagebox.showwarning("警告", "请输入有效的ts_code") return try: # 获取股票数据 df = pro.daily(ts_code=ts_code, start_date='20080101', end_date='20250317') if df.empty: messagebox.showwarning("警告", "未获取到该股票的数据") return # 转换日期格式并设置为索引 df['trade_date'] = pd.to_datetime(df['trade_date']) df.set_index('trade_date', inplace=True) # 重命名列以适应mplfinance的要求 df.rename(columns={'open': 'Open', 'high': 'High', 'low': 'Low', 'close': 'Close'}, inplace=True) #用matplotlib作图(df中取两列),mplfinance # 创建一个matplotlib的Figure对象 fig, ax = plt.subplots(figsize=(12, 6)) # 增加宽度 ap = mpf.make_addplot(df['vol'], panel=1, color='g', secondary_y=True) # 添加成交量图 mpf.plot(df, type='candle', style='yahoo', title='股票交易k线图', volume=True, addplot=ap, show_nontrading=True) except Exception as e: messagebox.showerror("错误", f"获取或绘制数据时出错: {str(e)}") root = tk.Tk() root.title("股票数据可视化") label = tk.Label(root, text="请输入ts_code:") label.grid(row=0, column=0) entry = tk.Entry(root) entry.grid(row=0, column=1) button = tk.Button(root, text="查询并绘图", command=get_and_plot_data) button.grid(row=1, column=0, columnspan=2) root.mainloop() 第40行代码哪里错了

<think>嗯,用户的问题是关于在使用tkinter、matplotlib、mplfinance和tushare时,第40行代码出现错误需要调试和解决方案。首先,我需要了解用户代码的大致结构和可能出现的错误类型。由于用户提到了第40行,但未提供具体代码,我得假设几种常见情况。 首先,常见的错误可能包括对象引用错误,比如变量未定义或错误命名。例如,用户可能在创建FigureCanvasTkAgg时没有正确传入figure对象,或者变量名拼写错误,比如将fig写成figure。这时候需要检查第40行是否有这样的问题。 其次,matplotlib与tkinter集成时,常见的错误包括未正确设置后端。用户需要确认是否在代码开头使用了mpl.use('TkAgg'),这能确保matplotlib使用tkinter作为后端。如果漏掉了这个设置,可能会导致渲染问题,尤其是在嵌入到tkinter窗口时。 另外,mplfinance在生成图表时可能需要特定的参数。例如,用户可能在使用mplfinance.plot()时没有正确指定参数,比如缺少数据源、类型错误或者没有将图形添加到正确的figure对象中。这可能导致第40行在调用plot方法时出错。 还有可能的问题是在创建FigureCanvasTkAgg时,没有正确地将canvas附加到tkinter窗口的布局中,比如忘记使用pack()或grid()方法,导致画布不可见或引发异常。 此外,用户可能在回调函数处理上遇到问题,比如按钮点击事件调用的函数没有正确绑定,或者回调函数内部引用了未初始化的变量。例如,如果用户在第40行定义了一个按钮的回调函数,但该函数试图访问尚未生成的数据或图形对象,就会导致错误。 还需要考虑数据获取的问题,比如使用tushare获取数据时,可能因为网络问题、API密钥无效或数据格式不正确,导致后续的绘图代码出错。例如,第40行可能在处理返回的DataFrame时出现KeyError或AttributeError。 最后,版本兼容性问题也可能导致错误。不同的库版本之间可能存在API变动,比如mplfinance的更新可能导致某些函数参数发生变化,或者matplotlib的后端配置方式有调整,需要检查用户使用的库版本是否兼容。 综合这些可能性,我应该建议用户首先检查第40行的具体代码,确认变量和函数调用是否正确,确保matplotlib后端设置正确,数据获取和处理无误,以及库的版本兼容性。如果用户能提供错误信息和代码片段,可以更精准地定位问题。</think>### 逐步调试与解决方案 根据你提到的`tkinter`+`matplotlib`+`mplfinance`+`tushare`组合及第40行代码错误,以下是常见错误场景和解决方案: --- #### **1. 检查后端设置错误** ```python # 必须确保在导入matplotlib前设置正确的后端 import matplotlib as mpl mpl.use('TkAgg') # 强制使用Tkinter后端[^1][^2] from matplotlib.backends.backend_tkagg import FigureCanvasTKagg ``` - **错误现象**:若未设置后端,第40行可能抛出`ImportError`或`RuntimeError`,提示无法创建GUI画布。 - **修复方案**:确保代码开头已正确配置后端。 --- #### **2. 画布未正确附加到窗口** ```python # 第40行可能涉及画布附加操作 canvas = FigureCanvasTkAgg(fig, master=root) canvas.draw() canvas.get_tk_widget().pack() # 必须调用布局管理器 ``` - **错误现象**:若漏掉`pack()`/`grid()`/`place()`,画布不会显示但可能无报错;若`fig`未定义,会抛`NameError`。 - **修复方案**:检查`fig`变量是否正确定义,并确保调用布局方法。 --- #### **3. mplfinance绘图参数错误** ```python # 第40行可能调用了mplfinance.plot() import mplfinance as mpf fig = mpf.plot(df, type='candle', returnfig=True)[0] # 必须用returnfig参数[^3] ``` - **错误现象**:若未设置`returnfig=True`,`mpf.plot()`默认直接显示图像,导致`fig`未定义。 - **修复方案**:添加`returnfig=True`并检查`df`数据结构是否符合要求(需包含`Open/High/Low/Close`列)。 --- #### **4. tushare数据获取失败** ```python # 第40行可能涉及数据获取 import tushare as ts df = ts.get_k_data('000001', start='2023-01-01') # 需要正确的股票代码和网络连接 ``` - **错误现象**:若网络异常或股票代码无效,`df`为空,后续绘图代码会抛`AttributeError`。 - **修复方案**:增加异常捕获并验证数据: ```python try: df = ts.get_k_data('000001', start='2023-01-01') if df.empty: raise ValueError("数据获取失败,检查代码或网络") except Exception as e: print(f"tushare错误: {e}") ``` --- #### **5. 线程安全问题** - **错误现象**:若在非主线程更新GUI元素(如第40行修改画布),会触发`RuntimeError: main thread is not in main loop`。 - **修复方案**:使用`root.after()`调度GUI更新: ```python def update_plot(): # 更新画布的逻辑 canvas.draw() root.after(0, update_plot) # 确保在主线程执行 ``` --- ### **示例代码框架** ```python import tkinter as tk import matplotlib as mpl mpl.use('TkAgg') # 关键设置[^1] from matplotlib.backends.backend_tkagg import FigureCanvasTkAgg from matplotlib.figure import Figure import mplfinance as mpf import tushare as ts def plot_stock(): try: df = ts.get_k_data('000001', start='2023-01-01') fig = mpf.plot(df, type='candle', returnfig=True)[0] # 返回figure对象 canvas = FigureCanvasTkAgg(fig, master=root) canvas.draw() canvas.get_tk_widget().pack() except Exception as e: print(f"错误: {e}") root = tk.Tk() btn = tk.Button(root, text="绘图", command=plot_stock) btn.pack() root.mainloop() ``` --- ### **相关问题** 1. 如何验证`matplotlib`是否正确配置了Tkinter后端? 2. `mplfinance`绘图时如何自定义图表样式(颜色、尺寸)? 3. `tushare`获取数据时如何处理权限或接口变更问题?
阅读全文

相关推荐

import pandas as pd import tkinter as tk from tkinter import messagebox, ttk, filedialog from matplotlib.backends.backend_tkagg import FigureCanvasTkAgg import matplotlib.pyplot as plt from matplotlib import font_manager from matplotlib.colors import Normalize # 新增这行 import seaborn as sns from scipy.ndimage import gaussian_filter import chardet import numpy as np # 新增导入 from matplotlib.colors import Normalize, LogNorm # 确保已导入 import re # 新增这行 import pandas as pd import tkinter as tk from tkinter import messagebox, ttk import traceback import re import pandas as pd def clean_time_input(time_str): """标准化时间输入格式为 YYYY/MM/DD HH:MM""" try: # 移除所有空格 time_str = time_str.replace(" ", "") # 匹配多种时间格式 match = re.match( r"(\d{4})[/-]?(\d{1,2})[/-]?(\d{1,2})[\sT]?(\d{1,2}:\d{2}|\d{4})", time_str ) if not match: raise ValueError("无法识别的时间格式") year, month, day, time = match.groups() # 处理时间部分(如果没冒号) if ":" not in time: time = f"{time[:2]}:{time[2:]}" # 补全前导零 month = month.zfill(2) day = day.zfill(2) return f"{year}/{month}/{day} {time}" except Exception as e: raise ValueError(f"时间处理失败: {str(e)}") # 创建主窗口 root = tk.Tk() root.title("电导率热力图查询工具") root.geometry("800x700") root.minsize(700, 600) # 字体设置,确保SimHei.ttf在当前目录,解决中文乱码 my_font = font_manager.FontProperties(fname="SimHei.ttf") plt.rcParams['font.family'] = my_font.get_name() plt.rcParams['axes.unicode_minus'] = False # 检查并读取数据 # 在读取数据后添加检查代码 import chardet def detect_encoding(file_path): with open(file_path, 'rb') as f: result = chardet.detect(f.read()) return result['encoding'] # 修改数据读取部分 # 检查并读取数据 try: encodings = { "新疆和田1电导率.csv": detect_encoding("新疆和田1电导率.csv"), "新疆和田2电导率.csv": detect_encoding("新疆和田2电导率.csv"), "新疆和田3电导率.csv": detect_encoding("新疆和田3电导率.csv"), "新疆和田4电导率.csv": detect_encoding("新疆和田4电导率.csv"), "新疆和田5电导率.csv": detect_encoding("新疆和田5电导率.csv"), "新疆和田6电导率.csv": detect_encoding("新疆和田6电导率.csv") } df_1 = pd.read_csv("新疆和田1电导率.csv", encoding=encodings["新疆和田1电导率.csv"]) df_2 = pd.read_csv("新疆和田2电导率.csv", encoding=encodings["新疆和田2电导率.csv"]) df_3 = pd.read_csv("新疆和田3电导率.csv", encoding=encodings["新疆和田3电导率.csv"]) df_4 = pd.read_csv("新疆和田4电导率.csv", encoding=encodings["新疆和田4电导率.csv"]) df_5 = pd.read_csv("新疆和田5电导率.csv", encoding=encodings["新疆和田5电导率.csv"]) df_6 = pd.read_csv("新疆和田6电导率.csv", encoding=encodings["新疆和田6电导率.csv"]) # 统一处理时间列 df_1['上报时间'] = pd.to_datetime(df_1['上报时间']) df_2['上报时间'] = pd.to_datetime(df_2['上报时间']) df_3['上报时间'] = pd.to_datetime(df_3['上报时间']) df_4['上报时间'] = pd.to_datetime(df_4['上报时间']) df_5['上报时间'] = pd.to_datetime(df_5['上报时间']) df_6['上报时间'] = pd.to_datetime(df_6['上报时间']) except FileNotFoundError as e: messagebox.showerror("文件错误", f"数据文件缺失: {e.filename}") root.quit() exit(1) depth_order = ['d1', 'd2', 'd3', 'd4', 'd5', 'd6', 'd7', 'd8'] depth_cm = { "d1": "10cm", "d2": "30cm", "d3": "50cm", "d4": "70cm", "d5": "90cm", "d6": "110cm", "d7": "130cm", "d8": "150cm" } # ========= 上方查询区 ========= frame_query = ttk.Frame(root, padding=10) frame_query.pack(fill='x') # 时间范围选择 frame_time = ttk.Frame(frame_query) frame_time.pack(side='left', padx=5) ttk.Label(frame_time, text="开始时间:", font=("微软雅黑", 11)).pack() entry_start = ttk.Entry(frame_time, width=20, font=("微软雅黑", 11)) entry_start.pack() entry_start.insert(0, "2025/6/30 17:00") ttk.Label(frame_time, text="结束时间:", font=("微软雅黑", 11)).pack() entry_end = ttk.Entry(frame_time, width=20, font=("微软雅黑", 11)) entry_end.pack() entry_end.insert(0, "2025/6/30 17:10") # 修改地块选择部分 frame_plot = ttk.Frame(frame_query) frame_plot.pack(side='left', padx=10) plot_vars = {} for i, plot_id in enumerate(["地块1", "地块2", "地块3", "地块4", "地块5", "地块6"]): var = tk.BooleanVar(value=True) plot_vars[plot_id] = var cb = ttk.Checkbutton(frame_plot, text=plot_id, variable=var) cb.grid(row=i//3, column=i%3, sticky='w') # 3列布局 # 操作按钮和颜色选择 frame_buttons = ttk.Frame(frame_query) frame_buttons.pack(side='left', padx=10) btn_query = ttk.Button(frame_buttons, text="生成热力图", width=15) btn_query.pack(pady=5) btn_save = ttk.Button(frame_buttons, text="保存图表", width=15) btn_save.pack(pady=5) # 颜色映射选择 cmap_options = ["YlGn", "YlOrRd", "coolwarm", "viridis", "plasma", "magma"] cmap_var = tk.StringVar(value=cmap_options[0]) frame_cmap = ttk.Frame(frame_query) frame_cmap.pack(side='left', padx=10) ttk.Label(frame_cmap, text="颜色方案:").pack() cmap_menu = ttk.OptionMenu(frame_cmap, cmap_var, *cmap_options) cmap_menu.pack() # ========= 状态栏和进度条 ========= status_frame = ttk.Frame(root) status_frame.pack(side='bottom', fill='x') status_var = tk.StringVar() status_var.set("请输入时间后点击生成热力图") status_bar = ttk.Label(status_frame, textvariable=status_var, relief='sunken', anchor='w', padding=5) status_bar.pack(side='left', fill='x', expand=True) progress_var = tk.DoubleVar() progress_bar = ttk.Progressbar(status_frame, variable=progress_var, maximum=100) progress_bar.pack(side='right', fill='x', padx=5, pady=2, ipady=2) # ========= 图表显示区 ========= frame_canvas = ttk.Frame(root) frame_canvas.pack(fill='both', expand=True, padx=10, pady=10) canvas_widget = None def extract_data(df, label): """从数据框中提取指定时间范围的数据""" global start_time, end_time # 使用全局时间变量 try: df_copy = df.copy() mask = (df_copy['上报时间'] >= start_time) & (df_copy['上报时间'] <= end_time) time_df = df_copy[mask] if time_df.empty: return None # 创建数据透视表 pivot = time_df.pivot_table( index='属性标识符', columns='设备名称', values='属性值', aggfunc='mean' ) # 确保深度顺序一致 pivot = pivot.reindex(depth_order) pivot.columns = [label] return pivot except Exception as e: print(f"提取数据错误: {str(e)}") return None def save_plot(): if canvas_widget is None: messagebox.showwarning("警告", "没有可保存的图表") return file_path = filedialog.asksaveasfilename( defaultextension=".png", filetypes=[("PNG 图片", "*.png"), ("所有文件", "*.*")] ) if file_path: canvas_widget.figure.savefig(file_path, dpi=300) status_var.set(f"图表已保存到: {file_path}") def show_heatmap(): global canvas_widget, start_time, end_time try: try: start_time = pd.to_datetime(clean_time_input(entry_start.get())) end_time = pd.to_datetime(clean_time_input(entry_end.get())) except ValueError as e: messagebox.showerror( "时间格式错误", f"请使用标准时间格式(如 2025/06/30 17:00)\n当前输入: {entry_start.get()}\n错误: {str(e)}" ) return # 检查时间顺序 if start_time > end_time: messagebox.showerror("时间错误", "开始时间不能晚于结束时间") return # 2. 检查地块选择(保留原逻辑) selected_plots = [k for k, v in plot_vars.items() if v.get()] if not selected_plots: messagebox.showwarning("警告", "请至少选择一个地块") return # 3. 数据提取和处理(保留原逻辑) data_frames = [] plot_mapping = {"地块1": df_1, "地块2": df_2, "地块3": df_3, "地块4": df_4, "地块5": df_5, "地块6": df_6} for plot in selected_plots: df = extract_data(plot_mapping[plot], plot) if df is not None: data_frames.append(df) if not data_frames: messagebox.showinfo("无数据", "选定时间范围内没有数据") return combined = pd.concat(data_frames, axis=1) combined.index = [depth_cm.get(i, i) for i in combined.index] # 4. 可视化部分(使用您提供的新代码) fig, ax = plt.subplots(figsize=(10, 7)) combined_numeric = combined.apply(pd.to_numeric, errors='coerce') sns.heatmap( combined_numeric, annot=True, cmap=cmap_var.get(), fmt=".1f", cbar_kws={'label': '电导率 (μS/cm)'}, ax=ax ) # 5. 显示图表(保留原逻辑) for widget in frame_canvas.winfo_children(): widget.destroy() canvas_widget = FigureCanvasTkAgg(fig, master=frame_canvas) canvas_widget.draw() canvas_widget.get_tk_widget().pack(fill='both', expand=True) status_var.set("热力图生成成功") except Exception as e: messagebox.showerror("生成错误", f"发生错误: {str(e)}") print(f"详细错误: {traceback.format_exc()}") def on_query_click(): show_heatmap() btn_query.config(command=on_query_click) btn_save.config(command=save_plot) # 支持回车查询和滚轮调整 def on_mouse_wheel(event, entry): try: current_time = pd.to_datetime(entry.get()) if event.delta > 0: # 滚轮上滚 new_time = current_time + pd.Timedelta(days=1) direction = "前进" else: # 滚轮下滚 new_time = current_time - pd.Timedelta(days=1) direction = "后退" entry.delete(0, tk.END) entry.insert(0, new_time.strftime("%Y/%m/%d %H:%M")) status_var.set(f"时间调整: {direction} 1天 (当前: {new_time.strftime('%Y/%m/%d %H:%M')})") show_heatmap() # 自动更新热力图 except: status_var.set("时间调整失败") entry_start.bind("<Return>", lambda event: on_query_click()) entry_start.bind("<MouseWheel>", lambda event: on_mouse_wheel(event, entry_start)) entry_end.bind("<Return>", lambda event: on_query_click()) entry_end.bind("<MouseWheel>", lambda event: on_mouse_wheel(event, entry_end)) # 添加必要的导入 from tkinter import filedialog # 设置关闭窗口时退出程序 root.protocol("WM_DELETE_WINDOW", root.quit) root.mainloop() 请你帮我优化这个代码,生成平滑的热力图

import tkinter as tk from tkinter import ttk, filedialog, messagebox import pandas as pd import matplotlib.pyplot as plt import matplotlib as mpl from matplotlib.backends.backend_tkagg import FigureCanvasTkAgg from sklearn.model_selection import train_test_split from sklearn.preprocessing import StandardScaler from sklearn.metrics import mean_squared_error, r2_score from sklearn.neural_network import MLPRegressor from sklearn.ensemble import RandomForestRegressor from sklearn.svm import SVR from skopt import BayesSearchCV from skopt.space import Real, Categorical, Integer import threading from pandas import ExcelWriter import numpy as np # 全局字体配置 mpl.rcParams['font.family'] = 'sans-serif' mpl.rcParams['font.sans-serif'] = ['Microsoft YaHei', 'SimHei'] mpl.rcParams['axes.unicode_minus'] = False class MLGUI: def __init__(self, master): self.master = master master.title("智能预测建模系统 v5.0") master.geometry("1400x900") # 初始化变量 self.file_path = tk.StringVar() self.n_features = tk.IntVar(value=5) self.n_targets = tk.IntVar(value=1) self.results = {} self.predictions = pd.DataFrame() self.export_dataframe = pd.DataFrame() self.scalers = {'X': None, 'Y': None} # 模型配置(统一命名规范) self.model_config = { "BP神经网络": {"var": tk.BooleanVar(value=True), "optimize": tk.BooleanVar(value=True)}, "随机森林": {"var": tk.BooleanVar(value=True), "optimize": tk.BooleanVar()}, "支持向量回归": {"var": tk.BooleanVar(value=True), "optimize": tk.BooleanVar(value=True)} } self.create_widgets() def create_widgets(self): # 控制面板 control_frame = ttk.LabelFrame(self.master, text="控制面板", width=350) control_frame.pack(side=tk.LEFT, fill=tk.Y, padx=10, pady=10) # 文件选择 ttk.Label(control_frame, text="1. 选择数据集:").grid(row=0, column=0, sticky=tk.W, pady=5) ttk.Entry(control_frame, textvariable=self.file_path, width=30).grid(row=1, column=0, padx=5) ttk.Button(control_frame, text="浏览...", command=self.select_file).grid(row=1, column=1) # 变量设置 ttk.Label(control_frame, text="2. 设置变量数量:").grid(row=2, column=0, sticky=tk.W, pady=10) ttk.Label(control_frame, text="自变量列数:").grid(row=3, column=0, sticky=tk.W) ttk.Spinbox(control_frame, from_=1, to=10, textvariable=self.n_features, width=8).grid(row=3, column=1) ttk.Label(control_frame, text="因变量列数:").grid(row=4, column=0, sticky=tk.W) ttk.Spinbox(control_frame, from_=1, to=20, textvariable=self.n_targets, width=8).grid(row=4, column=1) # 模型选择 ttk.Label(control_frame, text="3. 选择模型 (✓优化启用贝叶斯调参):").grid(row=5, column=0, sticky=tk.W, pady=10) row = 6 for model, config in self.model_config.items(): ttk.Checkbutton(control_frame, text=model, variable=config['var']).grid(row=row, column=0, sticky=tk.W) ttk.Checkbutton(control_frame, text="优化", variable=config['optimize']).grid(row=row, column=1) row += 1 # 操作按钮 ttk.Button(control_frame, text="开始建模预测", command=self.start_analysis, style='Accent.TButton').grid(row=10, column=0, columnspan=2, pady=10) ttk.Button(control_frame, text="导出结果", command=self.export_data, style='Accent.TButton').grid(row=11, column=0, columnspan=2, pady=10) # 结果面板 result_frame = ttk.LabelFrame(self.master, text="预测结果") result_frame.pack(side=tk.RIGHT, fill=tk.BOTH, expand=True, padx=10, pady=10) # 可视化区域 self.figure = plt.Figure(figsize=(10, 10), dpi=100) self.canvas = FigureCanvasTkAgg(self.figure, master=result_frame) self.canvas.get_tk_widget().pack(fill=tk.BOTH, expand=True) # 状态栏 self.status = ttk.Label(self.master, text="就绪", relief=tk.SUNKEN, anchor=tk.W) self.status.pack(side=tk.BOTTOM, fill=tk.X) def select_file(self): path = filedialog.askopenfilename(filetypes=[("CSV文件", "*.csv")]) if path: self.file_path.set(path) self.preview_data() def preview_data(self): try: data = pd.read_csv(self.file_path.get()) self.n_features.set(min(5, len(data.columns)-1)) self.n_targets.set(1) self.update_status(f"已加载数据集: {len(data)} 行 × {len(data.columns)} 列") except Exception as e: self.update_status(f"错误: {str(e)}", error=True) def update_status(self, text, error=False): self.status.config(text=text, foreground="red" if error else "black") def start_analysis(self): if not self.file_path.get(): self.update_status("错误: 请先选择数据集文件!", error=True) return threading.Thread(target=self.run_analysis).start() def run_analysis(self): try: self.master.config(cursor="watch") self.results = {} self.predictions = pd.DataFrame() self.scalers = {'X': None, 'Y': None} # 加载数据 self.update_status("正在加载数据...") data = pd.read_csv(self.file_path.get()) n_total = len(data.columns) n_features = self.n_features.get() n_targets = self.n_targets.get() if n_features + n_targets > n_total: raise ValueError(f"总列数不足:需要{n_features+n_targets}列,实际{n_total}列") X = data.iloc[:, :n_features].values Y = data.iloc[:, n_features:n_features+n_targets].values target_names = data.columns[n_features:n_features+n_targets].tolist() # 数据标准化(增强版) self.scalers['X'] = StandardScaler() X_scaled = self.scalers['X'].fit_transform(X) self.scalers['Y'] = StandardScaler() Y_scaled = self.scalers['Y'].fit_transform(Y) # 参数空间(优化版) param_spaces = { "BP神经网络": { 'hidden_layer_sizes': Categorical([(50,30), (30,15), (20,10)]), 'activation': Categorical(['relu', 'tanh']), 'alpha': Real(1e-5, 1e-3, prior='log-uniform'), 'learning_rate_init': Real(0.001, 0.1), 'max_iter': Integer(1000, 3000) # 显著增加迭代范围 }, "随机森林": { 'n_estimators': Integer(50, 200), 'max_depth': Integer(5, 30), 'min_samples_split': Integer(2, 10) }, "支持向量回归": { 'C': Real(0.1, 1000, prior='log-uniform'), 'gamma': Real(1e-4, 1e1, prior='log-uniform'), 'epsilon': Real(0.01, 0.2) } } # 基础模型配置(增强稳定性) base_models = { "BP神经网络": MLPRegressor( early_stopping=True, n_iter_no_change=30, random_state=42, max_iter=2000 # 设置默认最大迭代 ), "随机森林": RandomForestRegressor( random_state=42, n_estimators=100 ), "支持向量回归": SVR( kernel='rbf', C=10, gamma='scale' ) } all_results = [] pred_df = pd.DataFrame() # 数据分割 X_train, X_pred, Y_train_scaled, Y_pred_true_scaled = train_test_split( X_scaled, Y_scaled, test_size=0.3, random_state=42) for target_idx in range(n_targets): y_train_scaled = Y_train_scaled[:, target_idx] y_pred_true_scaled = Y_pred_true_scaled[:, target_idx] target_name = target_names[target_idx] # 实际值反标准化 actual_values = self.scalers['Y'].inverse_transform( np.concatenate( [ y_pred_true_scaled.reshape(-1,1), np.zeros((len(y_pred_true_scaled), n_targets-1)) ], axis=1 ) )[:, target_idx] pred_df[f'{target_name}_实际值'] = actual_values for model_name in self.model_config: if not self.model_config[model_name]['var'].get(): continue try: # 贝叶斯优化 if self.model_config[model_name]['optimize'].get(): self.update_status(f"优化中 [{model_name}-{target_name}]...") opt = BayesSearchCV( estimator=base_models[model_name], search_spaces=param_spaces[model_name], n_iter=30, cv=5, n_jobs=-1, random_state=42 ) opt.fit(X_train, y_train_scaled) model = opt.best_estimator_ params = opt.best_params_ else: # 默认训练(增强SVR默认参数) self.update_status(f"训练中 [{model_name}-{target_name}]...") model = base_models[model_name].set_params(**{ 'verbose': False, 'random_state': 42 }) if model_name == "支持向量回归": model.set_params(**{ 'max_iter': 5000 # 增加SVR默认迭代次数 }) model.fit(X_train, y_train_scaled) params = model.get_params() # 预测评估 try: y_pred_scaled = model.predict(X_pred) y_pred = self.scalers['Y'].inverse_transform( np.concatenate( [ y_pred_scaled.reshape(-1,1), np.zeros((len(y_pred_scaled), n_targets-1)) ], axis=1 ) )[:, target_idx] y_true = pred_df[f'{target_name}_实际值'].values mse = mean_squared_error(y_true, y_pred) r2 = r2_score(y_true, y_pred) except Exception as pred_error: self.update_status(f"{model_name}预测失败: {str(pred_error)}", error=True) continue # 存储结果 pred_col = f'{target_name}_{model_name}' if self.model_config[model_name]['optimize'].get(): pred_col += '(优化)' pred_df[pred_col] = y_pred all_results.append({ "目标变量": target_name, "模型": f"{model_name}{'(优化)' if self.model_config[model_name]['optimize'].get() else ''}", "参数": str(params), "MSE": mse, "R2": r2 }) except Exception as e: self.update_status(f"{model_name}训练失败: {str(e)}", error=True) continue self.export_dataframe = pd.DataFrame(all_results) self.predictions = pred_df self.master.after(0, self.visualize_results) self.update_status("建模完成!") except Exception as e: self.update_status(f"系统错误: {str(e)}", error=True) finally: self.master.config(cursor="") def visualize_results(self): self.figure.clear() ax1 = self.figure.add_subplot(211) ax2 = self.figure.add_subplot(212) ax1.axis('off') table_data = [] columns = ['目标变量', '模型', 'MSE', 'R²'] for _, row in self.export_dataframe.iterrows(): table_data.append([ row['目标变量'], row['模型'], f"{row['MSE']:.4f}", f"{row['R2']:.4f}" ]) table = ax1.table( cellText=table_data, colLabels=columns, loc='center', cellLoc='center', colColours=['#f0f0f0']*4 ) table.set_fontsize(8) table.scale(1.2, 1.2) ax1.set_title('模型性能评估表', fontsize=10) if not self.predictions.empty: target_columns = [col for col in self.predictions.columns if '_实际值' in col] for idx, target_col in enumerate(target_columns[:2]): target_name = target_col.replace('_实际值', '') actual = self.predictions[target_col] ax2.plot(actual.values, label=f'{target_name} 实际值', color='blue', linestyle='--', marker='o', markersize=4) model_cols = [col for col in self.predictions.columns if col.startswith(f'{target_name}_') and '实际值' not in col] colors = ['green', 'red', 'purple', 'orange'] for i, model_col in enumerate(model_cols): model_name = model_col.split('_')[-1].replace('(优化)', '') pred = self.predictions[model_col] ax2.plot(pred.values, label=f'{model_name}预测', color=colors[i], alpha=0.7, linestyle='-', linewidth=1.5) ax2.set_xlabel('样本序号', fontsize=9) ax2.set_ylabel('数值', fontsize=9) ax2.set_title(f'{target_name} 实际值与预测值对比', fontsize=10) ax2.legend(loc='upper right', fontsize=8) ax2.grid(True, linestyle='--', alpha=0.5) ax2.tick_params(axis='both', labelsize=8) self.figure.tight_layout() self.canvas.draw() def export_data(self): if self.export_dataframe.empty or self.predictions.empty: messagebox.showerror("错误", "没有可导出的结果!") return path = filedialog.asksaveasfilename( defaultextension=".xlsx", filetypes=[('Excel 文件', '*.xlsx'), ('CSV 文件', '*.csv')] ) if path: try: if path.endswith('.xlsx'): with ExcelWriter(path) as writer: self.export_dataframe.to_excel(writer, index=False, sheet_name='模型评估') self.predictions.to_excel(writer, index=False, sheet_name='预测结果') else: self.export_dataframe.to_csv(path, index=False) self.predictions.to_csv(f"{path}_predictions.csv", index=False) messagebox.showinfo("成功", f"结果已保存至:{path}") except Exception as e: messagebox.showerror("导出失败", f"错误原因:{str(e)}") if __name__ == "__main__": root = tk.Tk() style = ttk.Style() style.configure('Accent.TButton', foreground='white', background='#0078D7') app = MLGUI(root) root.mainloop()此代码运行中出现bp神经网络开启贝叶斯优化后消失以及支持向量回归在开启优化前消失的问题,请修正并给出完整代码。

import tkinter as tk from tkinter import ttk, filedialog, messagebox import pandas as pd import numpy as np import matplotlib as mpl import matplotlib.pyplot as plt from matplotlib.backends.backend_tkagg import FigureCanvasTkAgg import tensorflow as tf from tensorflow.keras.models import Model from tensorflow.keras.layers import Input, Dense, Lambda from tensorflow.keras.optimizers import Adam from sklearn.preprocessing import MinMaxScaler import os import time mpl.rcParams['font.sans-serif'] = ['SimHei', 'Microsoft YaHei', 'Arial Unicode MS'] mpl.rcParams['axes.unicode_minus'] = False # 关键修复:使用 ASCII 减号 # 设置中文字体支持 plt.rcParams['font.sans-serif'] = ['SimHei'] plt.rcParams['axes.unicode_minus'] = False class PINNModel(tf.keras.Model): def __init__(self, num_layers=4, hidden_units=32, **kwargs): super(PINNModel, self).__init__(**kwargs) self.dense_layers = [Dense(hidden_units, activation='tanh') for _ in range(num_layers)] self.final_layer = Dense(1, activation='linear') # 添加带约束的物理参数 self.k_raw = tf.Variable(0.01, trainable=True, dtype=tf.float32, name='k_raw') self.k = tf.math.sigmoid(self.k_raw) * 0.5 # 约束在0-0.5之间 def call(self, inputs): t, h = inputs x = tf.concat([t, h], axis=1) for layer in self.dense_layers: x = layer(x) return self.final_layer(x) def physics_loss(self, t, h_current): """计算物理损失(基于离散渗流方程)""" # 预测下一时刻的水位 h_next_pred = self([t, h_current]) # 离散渗流方程: h_{t+1} = h_t - k * h_t (时间步长=1) residual = h_next_pred - h_current * (1 - self.k) return tf.reduce_mean(tf.square(residual)) class DamSeepageModel: def __init__(self, root): self.root = root self.root.title("大坝渗流预测模型(PINNs)") self.root.geometry("1200x800") # 初始化数据 self.train_df = None #训练集 self.valid_df = None #验证集 self.test_df = None #测试集 self.model = None self.scaler = MinMaxScaler(feature_range=(0, 1)) self.evaluation_metrics = {} # 创建主界面 self.create_widgets() def create_widgets(self): # 创建主框架 main_frame = ttk.Frame(self.root, padding=10) main_frame.pack(fill=tk.BOTH, expand=True) # 左侧控制面板 control_frame = ttk.LabelFrame(main_frame, text="模型控制", padding=10) control_frame.pack(side=tk.LEFT, fill=tk.Y, padx=5, pady=5) # 文件选择部分 file_frame = ttk.LabelFrame(control_frame, text="数据文件", padding=10) file_frame.pack(fill=tk.X, pady=5) # 训练集选择 ttk.Label(file_frame, text="训练集:").grid(row=0, column=0, sticky=tk.W, pady=5) self.train_file_var = tk.StringVar() ttk.Entry(file_frame, textvariable=self.train_file_var, width=30, state='readonly').grid(row=0, column=1, padx=5) ttk.Button(file_frame, text="选择文件", command=lambda: self.select_file("train")).grid(row=0, column=2) # 测试集选择 ttk.Label(file_frame, text="测试集:").grid(row=1, column=0, sticky=tk.W, pady=5) self.test_file_var = tk.StringVar() ttk.Entry(file_frame, textvariable=self.test_file_var, width=30, state='readonly').grid(row=1, column=1, padx=5) ttk.Button(file_frame, text="选择文件", command=lambda: self.select_file("test")).grid(row=1, column=2) # PINNs参数设置 param_frame = ttk.LabelFrame(control_frame, text="PINNs参数", padding=10) param_frame.pack(fill=tk.X, pady=10) # 隐藏层数量 ttk.Label(param_frame, text="网络层数:").grid(row=0, column=0, sticky=tk.W, pady=5) self.num_layers_var = tk.IntVar(value=4) ttk.Spinbox(param_frame, from_=2, to=8, increment=1, textvariable=self.num_layers_var, width=10).grid(row=0, column=1, padx=5) # 每层神经元数量 ttk.Label(param_frame, text="神经元数/层:").grid(row=1, column=0, sticky=tk.W, pady=5) self.hidden_units_var = tk.IntVar(value=32) ttk.Spinbox(param_frame, from_=16, to=128, increment=4, textvariable=self.hidden_units_var, width=10).grid(row=1, column=1, padx=5) # 训练轮次 ttk.Label(param_frame, text="训练轮次:").grid(row=2, column=0, sticky=tk.W, pady=5) self.epochs_var = tk.IntVar(value=500) ttk.Spinbox(param_frame, from_=100, to=2000, increment=100, textvariable=self.epochs_var, width=10).grid(row=2, column=1, padx=5) # 物理损失权重 ttk.Label(param_frame, text="物理损失权重:").grid(row=3, column=0, sticky=tk.W, pady=5) self.physics_weight_var = tk.DoubleVar(value=0.5) ttk.Spinbox(param_frame, from_=0.1, to=1.0, increment=0.1, textvariable=self.physics_weight_var, width=10).grid(row=3, column=1, padx=5) # 控制按钮 btn_frame = ttk.Frame(control_frame) btn_frame.pack(fill=tk.X, pady=10) ttk.Button(btn_frame, text="训练模型", command=self.train_model).pack(side=tk.LEFT, padx=5) ttk.Button(btn_frame, text="预测结果", command=self.predict).pack(side=tk.LEFT, padx=5) ttk.Button(btn_frame, text="保存结果", command=self.save_results).pack(side=tk.LEFT, padx=5) ttk.Button(btn_frame, text="重置", command=self.reset).pack(side=tk.RIGHT, padx=5) # 状态栏 self.status_var = tk.StringVar(value="就绪") status_bar = ttk.Label(control_frame, textvariable=self.status_var, relief=tk.SUNKEN, anchor=tk.W) status_bar.pack(fill=tk.X, side=tk.BOTTOM) # 右侧结果显示区域 result_frame = ttk.Frame(main_frame) result_frame.pack(side=tk.RIGHT, fill=tk.BOTH, expand=True, padx=5, pady=5) # 创建标签页 self.notebook = ttk.Notebook(result_frame) self.notebook.pack(fill=tk.BOTH, expand=True) # 损失曲线标签页 self.loss_frame = ttk.Frame(self.notebook) self.notebook.add(self.loss_frame, text="训练损失") # 预测结果标签页 self.prediction_frame = ttk.Frame(self.notebook) self.notebook.add(self.prediction_frame, text="预测结果") # 指标显示 self.metrics_var = tk.StringVar() metrics_label = ttk.Label( self.prediction_frame, textvariable=self.metrics_var, font=('TkDefaultFont', 10, 'bold'), relief='ridge', padding=5 ) metrics_label.pack(fill=tk.X, padx=5, pady=5) # 初始化绘图区域 self.fig, self.ax = plt.subplots(figsize=(10, 6)) self.canvas = FigureCanvasTkAgg(self.fig, master=self.prediction_frame) self.canvas.get_tk_widget().pack(fill=tk.BOTH, expand=True) # 损失曲线画布 self.loss_fig, self.loss_ax = plt.subplots(figsize=(10, 4)) self.loss_canvas = FigureCanvasTkAgg(self.loss_fig, master=self.loss_frame) self.loss_canvas.get_tk_widget().pack(fill=tk.BOTH, expand=True) def select_file(self, file_type): """选择Excel文件""" file_path = filedialog.askopenfilename( title=f"选择{file_type}集Excel文件", filetypes=[("Excel文件", "*.xlsx *.xls"), ("所有文件", "*.*")] ) if file_path: try: df = pd.read_excel(file_path) # 时间特征处理 time_features = ['year', 'month', 'day'] missing_time_features = [feat for feat in time_features if feat not in df.columns] if missing_time_features: messagebox.showerror("列名错误", f"Excel文件缺少预处理后的时间特征列: {', '.join(missing_time_features)}") return # 创建时间戳列 (增强兼容性) time_cols = ['year', 'month', 'day'] if 'hour' in df.columns: time_cols.append('hour') if 'minute' in df.columns: time_cols.append('minute') if 'second' in df.columns: time_cols.append('second') # 填充缺失的时间单位 for col in ['hour', 'minute', 'second']: if col not in df.columns: df[col] = 0 df['datetime'] = pd.to_datetime(df[time_cols]) # 设置时间索引 df = df.set_index('datetime') # 计算相对时间(天) df['days'] = (df.index - df.index[0]).days # 保存数据 if file_type == "train": self.train_df = df self.train_file_var.set(os.path.basename(file_path)) self.status_var.set(f"已加载训练集: {len(self.train_df)}条数据") else: self.test_df = df self.test_file_var.set(os.path.basename(file_path)) self.status_var.set(f"已加载测试集: {len(self.test_df)}条数据") except Exception as e: messagebox.showerror("文件错误", f"读取文件失败: {str(e)}") def calculate_metrics(self, y_true, y_pred): """计算评估指标""" from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score mse = mean_squared_error(y_true, y_pred) rmse = np.sqrt(mse) mae = mean_absolute_error(y_true, y_pred) non_zero_idx = np.where(y_true != 0)[0] if len(non_zero_idx) > 0: mape = np.mean(np.abs((y_true[non_zero_idx] - y_pred[non_zero_idx]) / y_true[non_zero_idx])) * 100 else: mape = float('nan') r2 = r2_score(y_true, y_pred) return { 'MSE': mse, 'RMSE': rmse, 'MAE': mae, 'MAPE': mape, 'R2': r2 } def train_model(self): """训练PINNs模型(含验证集)""" if self.train_df is None: messagebox.showwarning("警告", "请先选择训练集文件") return if self.valid_df is None: messagebox.showwarning("警告", "请先选择验证集文件") return try: self.status_var.set("正在预处理数据...") self.root.update() # 数据预处理(训练集) train_scaled = self.scaler.fit_transform(self.train_df[['水位']]) t_train = self.train_df['days'].values[1:].reshape(-1, 1).astype(np.float32) h_train = train_scaled[:-1].astype(np.float32) h_next_train = train_scaled[1:].astype(np.float32) # 数据预处理(验证集,使用训练集的scaler避免数据泄漏) valid_scaled = self.scaler.transform(self.valid_df[['水位']]) t_valid = self.valid_df['days'].values[1:].reshape(-1, 1).astype(np.float32) h_valid = valid_scaled[:-1].astype(np.float32) h_next_valid = valid_scaled[1:].astype(np.float32) # 创建模型和优化器 self.model = PINNModel( num_layers=self.num_layers_var.get(), hidden_units=self.hidden_units_var.get() ) optimizer = Adam(learning_rate=0.001) # 构建训练/验证数据集 train_dataset = tf.data.Dataset.from_tensor_slices(((t_train, h_train), h_next_train)) train_dataset = train_dataset.shuffle(buffer_size=1024).batch(32) valid_dataset = tf.data.Dataset.from_tensor_slices(((t_valid, h_valid), h_next_valid)) valid_dataset = valid_dataset.batch(32) # 验证集无需shuffle # 损失记录(移除总损失) train_data_loss_history = [] physics_loss_history = [] valid_data_loss_history = [] start_time = time.time() # 自定义训练循环 for epoch in range(self.epochs_var.get()): # 训练阶段 epoch_train_data_loss = [] epoch_physics_loss = [] for step, ((t_batch, h_batch), h_next_batch) in enumerate(train_dataset): with tf.GradientTape() as tape: h_pred = self.model([t_batch, h_batch]) # 仅保留数据损失和物理损失(优化仍用组合损失) data_loss = tf.reduce_mean(tf.square(h_next_batch - h_pred)) physics_loss = self.model.physics_loss(t_batch, h_batch) # 优化目标仍为数据损失+物理约束(按权重组合) loss = data_loss + self.physics_weight_var.get() * physics_loss grads = tape.gradient(loss, self.model.trainable_variables) optimizer.apply_gradients(zip(grads, self.model.trainable_variables)) epoch_train_data_loss.append(data_loss.numpy()) epoch_physics_loss.append(physics_loss.numpy()) # 验证阶段(仅前向传播,不更新参数) epoch_valid_data_loss = [] for ((t_v_batch, h_v_batch), h_v_next_batch) in valid_dataset: h_v_pred = self.model([t_v_batch, h_v_batch]) valid_data_loss = tf.reduce_mean(tf.square(h_v_next_batch - h_v_pred)) epoch_valid_data_loss.append(valid_data_loss.numpy()) # 计算平均损失 avg_train_data_loss = np.mean(epoch_train_data_loss) avg_physics_loss = np.mean(epoch_physics_loss) avg_valid_data_loss = np.mean(epoch_valid_data_loss) # 记录损失 train_data_loss_history.append(avg_train_data_loss) physics_loss_history.append(avg_physics_loss) valid_data_loss_history.append(avg_valid_data_loss) # 更新状态(显示验证损失) if epoch % 10 == 0: k_value = self.model.k.numpy() elapsed = time.time() - start_time self.status_var.set( f"训练中 | 轮次: {epoch + 1}/{self.epochs_var.get()} | " f"训练数据损失: {avg_train_data_loss:.4f} | " f"物理损失: {avg_physics_loss:.4f} | " f"验证数据损失: {avg_valid_data_loss:.4f} | " f"k: {k_value:.6f} | 时间: {elapsed:.1f}秒" ) self.root.update() # 绘制损失曲线(移除总损失,新增验证损失) self.loss_ax.clear() epochs_range = range(1, len(train_data_loss_history) + 1) self.loss_ax.plot(epochs_range, train_data_loss_history, 'b-', label='训练数据损失') self.loss_ax.plot(epochs_range, physics_loss_history, 'r--', label='物理损失') self.loss_ax.plot(epochs_range, valid_data_loss_history, 'g-.', label='验证数据损失') self.loss_ax.set_title('PINNs训练与验证损失') self.loss_ax.set_xlabel('轮次') self.loss_ax.set_ylabel('损失', rotation=0) self.loss_ax.legend() self.loss_ax.grid(True) self.loss_ax.set_yscale('log') self.loss_canvas.draw() # 训练完成提示 elapsed = time.time() - start_time self.status_var.set( f"训练完成 | 总轮次: {self.epochs_var.get()} | " f"最终训练数据损失: {train_data_loss_history[-1]:.4f} | " f"最终物理损失: {physics_loss_history[-1]:.4f} | " f"最终验证数据损失: {valid_data_loss_history[-1]:.4f} | " f"总时间: {elapsed:.1f}秒" ) messagebox.showinfo("训练完成", "PINNs模型训练成功完成!") except Exception as e: messagebox.showerror("训练错误", f"模型训练失败:\n{str(e)}") self.status_var.set("训练失败") def predict(self): """使用PINNs模型进行预测""" if self.model is None: messagebox.showwarning("警告", "请先训练模型") return if self.test_df is None: messagebox.showwarning("警告", "请先选择测试集文件") return try: self.status_var.set("正在生成预测...") self.root.update() # 预处理测试数据 test_scaled = self.scaler.transform(self.test_df[['水位']]) # 准备时间特征 t_test = self.test_df['days'].values.reshape(-1, 1).astype(np.float32) # 递归预测 predictions = [] for i in range(len(t_test)): h_current = np.array([[test_scaled[i][0]]]).astype(np.float32) h_pred = self.model([t_test[i:i + 1], h_current]) predictions.append(h_pred.numpy()[0][0]) # 反归一化 predictions = np.array(predictions).reshape(-1, 1) predictions = self.scaler.inverse_transform(predictions) actual_values = self.scaler.inverse_transform(test_scaled) # 创建时间索引 test_time = self.test_df.index # 清除现有图表 self.ax.clear() # 绘制结果 self.ax.plot(test_time, actual_values, 'b-', label='真实值') self.ax.plot(test_time, predictions, 'r--', label='预测值') self.ax.set_title('大坝渗流水位预测结果(PINNs)') self.ax.set_xlabel('时间') self.ax.set_ylabel('测压管水位', rotation=0) self.ax.legend() # 添加网格和样式 self.ax.grid(True, alpha=0.3) # 计算并显示评估指标 self.evaluation_metrics = self.calculate_metrics( actual_values.flatten(), predictions.flatten() ) metrics_text = ( f"MSE: {self.evaluation_metrics['MSE']:.4f} | " f"RMSE: {self.evaluation_metrics['RMSE']:.4f} | " f"MAE: {self.evaluation_metrics['MAE']:.4f} | " f"MAPE: {self.evaluation_metrics['MAPE']:.2f}% | " f"R²: {self.evaluation_metrics['R2']:.4f}" ) # 更新文本标签 self.metrics_var.set(metrics_text) # 在图表上添加指标 self.ax.text( 0.5, 1.05, metrics_text, transform=self.ax.transAxes, ha='center', fontsize=10, bbox=dict(facecolor='white', alpha=0.8) ) # 调整布局并显示图表 plt.tight_layout() self.canvas.draw() # 保存预测结果 self.predictions = predictions self.actual_values = actual_values self.test_time = test_time self.status_var.set("预测完成,结果已显示") except Exception as e: messagebox.showerror("预测错误", f"预测失败:\n{str(e)}") self.status_var.set("预测失败") def save_results(self): """保存预测结果""" if not hasattr(self, 'predictions'): messagebox.showwarning("警告", "请先生成预测结果") return save_path = filedialog.asksaveasfilename( defaultextension=".xlsx", filetypes=[("Excel文件", "*.xlsx"), ("所有文件", "*.*")] ) if not save_path: return try: # 创建结果DataFrame result_df = pd.DataFrame({ '时间': self.test_time, '实际水位': self.actual_values.flatten(), '预测水位': self.predictions.flatten() }) # 创建评估指标DataFrame metrics_df = pd.DataFrame([self.evaluation_metrics]) # 保存到Excel with pd.ExcelWriter(save_path) as writer: result_df.to_excel(writer, sheet_name='预测结果', index=False) metrics_df.to_excel(writer, sheet_name='评估指标', index=False) # 保存图表 chart_path = os.path.splitext(save_path)[0] + "_chart.png" self.fig.savefig(chart_path, dpi=300) self.status_var.set(f"结果已保存至: {os.path.basename(save_path)}") messagebox.showinfo("保存成功", f"预测结果和图表已保存至:\n{save_path}\n{chart_path}") except Exception as e: messagebox.showerror("保存错误", f"保存结果失败:\n{str(e)}") def reset(self): """重置程序状态""" self.train_df = None self.test_df = None self.model = None self.train_file_var.set("") self.test_file_var.set("") # 清除图表 if hasattr(self, 'ax'): self.ax.clear() if hasattr(self, 'loss_ax'): self.loss_ax.clear() # 重绘画布 if hasattr(self, 'canvas'): self.canvas.draw() if hasattr(self, 'loss_canvas'): self.loss_canvas.draw() # 清除状态 self.status_var.set("已重置,请选择新数据") # 清除预测结果 if hasattr(self, 'predictions'): del self.predictions # 清除指标文本 if hasattr(self, 'metrics_var'): self.metrics_var.set("") messagebox.showinfo("重置", "程序已重置,可以开始新的分析") if __name__ == "__main__": root = tk.Tk() app = DamSeepageModel(root) root.mainloop() 帮我检查一下有没有错误,为什么模型让我选择验证集文件,我希望验证集是从训练集中切分出0.2比例

import tkinter as tk from tkinter import ttk, filedialog, messagebox import pandas as pd import numpy as np import matplotlib as mpl import matplotlib.pyplot as plt from matplotlib.font_manager import FontProperties from matplotlib.backends.backend_tkagg import FigureCanvasTkAgg from sklearn.preprocessing import MinMaxScaler import tensorflow as tf from tensorflow.keras.models import Sequential from tensorflow.keras.layers import LSTM, Dense,Layer,Dropout from tensorflow.keras.optimizers import Adam from tensorflow.keras.callbacks import EarlyStopping,ModelCheckpoint,ReduceLROnPlateau import os plt.rcParams['font.sans-serif'] = ['SimHei'] # 使用黑体 plt.rcParams['axes.unicode_minus'] = False class DamSeepageModel: def __init__(self, root): self.root = root self.root.title("大坝渗流预测模型") self.root.geometry("1200x800") # 初始化数据 self.train_df = None self.test_df = None self.model = None self.scaler = MinMaxScaler(feature_range=(0, 1)) # 创建主界面 self.create_widgets() def create_widgets(self): # 创建主框架 main_frame = ttk.Frame(self.root, padding=10) main_frame.pack(fill=tk.BOTH, expand=True) # 左侧控制面板 control_frame = ttk.LabelFrame(main_frame, text="模型控制", padding=10) control_frame.pack(side=tk.LEFT, fill=tk.Y, padx=5, pady=5) # 文件选择部分 file_frame = ttk.LabelFrame(control_frame, text="数据文件", padding=10) file_frame.pack(fill=tk.X, pady=5) # 训练集选择 ttk.Label(file_frame, text="训练集:").grid(row=0, column=0, sticky=tk.W, pady=5) self.train_file_var = tk.StringVar() ttk.Entry(file_frame, textvariable=self.train_file_var, width=30, state='readonly').grid(row=0, column=1, padx=5) ttk.Button(file_frame, text="选择文件", command=lambda: self.select_file("train")).grid(row=0, column=2) # 测试集选择 ttk.Label(file_frame, text="测试集:").grid(row=1, column=0, sticky=tk.W, pady=5) self.test_file_var = tk.StringVar() ttk.Entry(file_frame, textvariable=self.test_file_var, width=30, state='readonly').grid(row=1, column=1, padx=5) ttk.Button(file_frame, text="选择文件", command=lambda: self.select_file("test")).grid(row=1, column=2) # 参数设置部分 param_frame = ttk.LabelFrame(control_frame, text="模型参数", padding=10) param_frame.pack(fill=tk.X, pady=10) ttk.Label(param_frame, text="Dropout率:").grid(row=5, column=0, sticky=tk.W, pady=5) self.dropout_rate_var = tk.DoubleVar(value=0.2) ttk.Spinbox(param_frame, from_=0.0, to=0.5, increment=0.05, format="%.2f", textvariable=self.dropout_rate_var, width=10).grid(row=5, column=1, padx=5) # 时间窗口大小 ttk.Label(param_frame, text="时间窗口大小:").grid(row=0, column=0, sticky=tk.W, pady=5) self.window_size_var = tk.IntVar(value=60) ttk.Spinbox(param_frame, from_=10, to=200, increment=5, textvariable=self.window_size_var, width=10).grid(row=0, column=1, padx=5) # LSTM单元数量 ttk.Label(param_frame, text="LSTM单元数:").grid(row=1, column=0, sticky=tk.W, pady=5) self.lstm_units_var = tk.IntVar(value=50) ttk.Spinbox(param_frame, from_=10, to=200, increment=10, textvariable=self.lstm_units_var, width=10).grid(row=1, column=1, padx=5) # 训练轮次 ttk.Label(param_frame, text="训练轮次:").grid(row=2, column=0, sticky=tk.W, pady=5) self.epochs_var = tk.IntVar(value=100) ttk.Spinbox(param_frame, from_=10, to=500, increment=10, textvariable=self.epochs_var, width=10).grid(row=2, column=1, padx=5) # 批处理大小 ttk.Label(param_frame, text="批处理大小:").grid(row=3, column=0, sticky=tk.W, pady=5) self.batch_size_var = tk.IntVar(value=32) ttk.Spinbox(param_frame, from_=16, to=128, increment=16, textvariable=self.batch_size_var, width=10).grid(row=3, column=1, padx=5) # 控制按钮 btn_frame = ttk.Frame(control_frame) btn_frame.pack(fill=tk.X, pady=10) ttk.Button(btn_frame, text="训练模型", command=self.train_model).pack(side=tk.LEFT, padx=5) ttk.Button(btn_frame, text="预测结果", command=self.predict).pack(side=tk.LEFT, padx=5) ttk.Button(btn_frame, text="保存结果", command=self.save_results).pack(side=tk.LEFT, padx=5) ttk.Button(btn_frame, text="重置", command=self.reset).pack(side=tk.RIGHT, padx=5) # 状态栏 self.status_var = tk.StringVar(value="就绪") status_bar = ttk.Label(control_frame, textvariable=self.status_var, relief=tk.SUNKEN, anchor=tk.W) status_bar.pack(fill=tk.X, side=tk.BOTTOM) # 右侧结果显示区域 result_frame = ttk.Frame(main_frame) result_frame.pack(side=tk.RIGHT, fill=tk.BOTH, expand=True, padx=5, pady=5) # 创建标签页 self.notebook = ttk.Notebook(result_frame) self.notebook.pack(fill=tk.BOTH, expand=True) # 损失曲线标签页 self.loss_frame = ttk.Frame(self.notebook) self.notebook.add(self.loss_frame, text="训练损失") # 预测结果标签页 self.prediction_frame = ttk.Frame(self.notebook) self.notebook.add(self.prediction_frame, text="预测结果") # 初始化绘图区域 self.fig, self.ax = plt.subplots(figsize=(10, 6)) self.canvas = FigureCanvasTkAgg(self.fig, master=self.prediction_frame) self.canvas.get_tk_widget().pack(fill=tk.BOTH, expand=True) self.loss_fig, self.loss_ax = plt.subplots(figsize=(10, 4)) self.loss_canvas = FigureCanvasTkAgg(self.loss_fig, master=self.loss_frame) self.loss_canvas.get_tk_widget().pack(fill=tk.BOTH, expand=True) # 文件选择 def select_file(self, file_type): """选择Excel文件""" file_path = filedialog.askopenfilename( title=f"选择{file_type}集Excel文件", filetypes=[("Excel文件", "*.xlsx *.xls"), ("所有文件", "*.*")] ) if file_path: try: # 读取Excel文件 df = pd.read_excel(file_path) # 时间特征列 time_features = ['year', 'month', 'day'] missing_time_features = [feat for feat in time_features if feat not in df.columns] if '水位' not in df.columns: messagebox.showerror("列名错误", "Excel文件必须包含'水位'列") return if missing_time_features: messagebox.showerror("列名错误", f"Excel文件缺少预处理后的时间特征列: {', '.join(missing_time_features)}\n" "请确保已使用预处理功能添加这些列") return # 创建完整的时间戳列 # 处理可能缺失的小时、分钟、秒数据 if 'hour' in df.columns and 'minute' in df.columns and 'second' in df.columns: df['datetime'] = pd.to_datetime( df[['year', 'month', 'day', 'hour', 'minute', 'second']] ) elif 'hour' in df.columns and 'minute' in df.columns: df['datetime'] = pd.to_datetime( df[['year', 'month', 'day', 'hour', 'minute']].assign(second=0) ) else: df['datetime'] = pd.to_datetime(df[['year', 'month', 'day']]) # 设置时间索引 df = df.set_index('datetime') # 保存数据 if file_type == "train": self.train_df = df self.train_file_var.set(os.path.basename(file_path)) self.status_var.set(f"已加载训练集: {len(self.train_df)}条数据") else: self.test_df = df self.test_file_var.set(os.path.basename(file_path)) self.status_var.set(f"已加载测试集: {len(self.test_df)}条数据") except Exception as e: messagebox.showerror("文件错误", f"读取文件失败: {str(e)}") def create_dataset(self, data, window_size): """创建时间窗口数据集""" X, y = [], [] for i in range(len(data) - window_size): X.append(data[i:(i + window_size), 0]) y.append(data[i + window_size, 0]) return np.array(X), np.array(y) def train_model(self): """训练LSTM模型""" if self.train_df is None: messagebox.showwarning("警告", "请先选择训练集文件") return try: self.status_var.set("正在预处理数据...") self.root.update() # 数据预处理 train_scaled = self.scaler.fit_transform(self.train_df[['水位']]) # 创建时间窗口数据集 window_size = self.window_size_var.get() X_train, y_train = self.create_dataset(train_scaled, window_size) # 调整LSTM输入格式 X_train = np.reshape(X_train, (X_train.shape[0], X_train.shape[1], 1)) # 构建LSTM模型 self.model = Sequential() self.model.add(LSTM( self.lstm_units_var.get(), return_sequences=True, input_shape=(window_size, 1) )) self.model.add(Dropout(self.dropout_rate_var.get())) # 添加Dropout # 添加额外的LSTM层 for _ in range(self.lstm_layers_var.get() - 1): self.model.add(LSTM(self.lstm_units_var.get())) self.model.add(Dropout(self.dropout_rate_var.get())) # 添加Dropout self.model.add(Dense(1)) self.model.compile( optimizer=Adam(learning_rate=0.001), # 优化器 loss='binary_crossentropy', # 损失函数 metrics=['accuracy'] # 评估指标 ) # 添加更完善的回调机制 early_stopping = EarlyStopping( monitor='val_loss', patience=30, # 增加耐心值 min_delta=0.0001, restore_best_weights=True, verbose=1 ) model_checkpoint = ModelCheckpoint( 'best_model.h5', monitor='val_loss', save_best_only=True, verbose=1 ) reduce_lr = ReduceLROnPlateau( monitor='val_loss', factor=0.4, # 更激进的学习率衰减 patience=20, min_lr=1e-6, verbose=1 ) # 训练模型(添加更多回调) history = self.model.fit( X_train, y_train, epochs=self.epochs_var.get(), batch_size=self.batch_size_var.get(), validation_split=0.2, callbacks=[early_stopping, model_checkpoint, reduce_lr], verbose=0 ) # 绘制损失曲线 self.loss_ax.clear() self.loss_ax.plot(history.history['loss'], label='训练损失') self.loss_ax.plot(history.history['val_loss'], label='验证损失') self.loss_ax.set_title('模型训练损失') self.loss_ax.set_xlabel('轮次') self.loss_ax.set_ylabel('损失',rotation=0) self.loss_ax.legend() self.loss_ax.grid(True) self.loss_canvas.draw() # 根据早停情况更新状态信息 if early_stopping.stopped_epoch > 0: stopped_epoch = early_stopping.stopped_epoch best_epoch = early_stopping.best_epoch final_loss = history.history['loss'][-1] best_loss = min(history.history['val_loss']) self.status_var.set( f"训练在{stopped_epoch + 1}轮提前终止 | " f"最佳模型在第{best_epoch + 1}轮 | " f"最终损失: {final_loss:.6f} | " f"最佳验证损失: {best_loss:.6f}" ) messagebox.showinfo( "训练完成", f"模型训练提前终止!\n" f"最佳模型在第{best_epoch + 1}轮\n" f"最佳验证损失: {best_loss:.6f}" ) else: final_loss = history.history['loss'][-1] self.status_var.set(f"模型训练完成 | 最终损失: {final_loss:.6f}") messagebox.showinfo("训练完成", "模型训练成功完成!") except Exception as e: messagebox.showerror("训练错误", f"模型训练失败:\n{str(e)}") self.status_var.set("训练失败") def predict(self): """使用模型进行预测""" if self.model is None: messagebox.showwarning("警告", "请先训练模型") return if self.test_df is None: messagebox.showwarning("警告", "请先选择测试集文件") return try: self.status_var.set("正在生成预测...") self.root.update() # 预处理测试数据 test_scaled = self.scaler.transform(self.test_df[['水位']]) # 创建测试集时间窗口 window_size = self.window_size_var.get() X_test, y_test = self.create_dataset(test_scaled, window_size) X_test = np.reshape(X_test, (X_test.shape[0], X_test.shape[1], 1)) # 进行预测 test_predict = self.model.predict(X_test) # 反归一化 test_predict = self.scaler.inverse_transform(test_predict) y_test_orig = self.scaler.inverse_transform([y_test]).T # 创建时间索引 test_time = self.test_df.index[window_size:window_size + len(test_predict)] # 绘制结果 self.ax.clear() self.ax.plot(self.train_df.index, self.train_df['水位'], 'b-', label='训练集数据') self.ax.plot(test_time, self.test_df['水位'][window_size:window_size + len(test_predict)], 'g-', label='测试集数据') self.ax.plot(test_time, test_predict, 'r--', label='模型预测') # 添加分隔线 split_point = test_time[0] self.ax.axvline(x=split_point, color='k', linestyle='--', alpha=0.5) self.ax.text(split_point, self.ax.get_ylim()[0] * 0.9, ' 训练/测试分界', rotation=90) self.ax.set_title('大坝渗流水位预测结果') self.ax.set_xlabel('时间') self.ax.set_ylabel('测压管水位',rotation=0) self.ax.legend() self.ax.grid(True) self.ax.tick_params(axis='x', rotation=45) self.fig.tight_layout() self.canvas.draw() self.status_var.set("预测完成,结果已显示") except Exception as e: messagebox.showerror("预测错误", f"预测失败:\n{str(e)}") self.status_var.set("预测失败") def save_results(self): """保存预测结果""" if not hasattr(self, 'test_predict') or self.test_predict is None: messagebox.showwarning("警告", "请先生成预测结果") return save_path = filedialog.asksaveasfilename( defaultextension=".xlsx", filetypes=[("Excel文件", "*.xlsx"), ("所有文件", "*.*")] ) if not save_path: return try: # 创建包含预测结果的DataFrame window_size = self.window_size_var.get() test_time = self.test_df.index[window_size:window_size + len(self.test_predict)] result_df = pd.DataFrame({ '时间': test_time, '实际水位': self.test_df['水位'][window_size:window_size + len(self.test_predict)].values, '预测水位': self.test_predict.flatten() }) # 保存到Excel result_df.to_excel(save_path, index=False) # 保存图表 chart_path = os.path.splitext(save_path)[0] + "_chart.png" self.fig.savefig(chart_path, dpi=300) self.status_var.set(f"结果已保存至: {os.path.basename(save_path)}") messagebox.showinfo("保存成功", f"预测结果和图表已保存至:\n{save_path}\n{chart_path}") except Exception as e: messagebox.showerror("保存错误", f"保存结果失败:\n{str(e)}") def reset(self): """重置程序状态""" self.train_df = None self.test_df = None self.model = None self.train_file_var.set("") self.test_file_var.set("") self.ax.clear() self.loss_ax.clear() self.canvas.draw() self.loss_canvas.draw() self.data_text.delete(1.0, tk.END) self.status_var.set("已重置,请选择新数据") messagebox.showinfo("重置", "程序已重置,可以开始新的分析") if __name__ == "__main__": root = tk.Tk() app = DamSeepageModel(root) root.mainloop() 这个代码怎么改进来降低训练和验证损失

import tkinter as tk from tkinter import messagebox import matplotlib.pyplot as plt import numpy as np subjects = ['C语言', 'C语言', '高数', '英语', 'c++', '体育'] scores = [85, 85, 78, 88, 90] def zf(): plt.bar(subjects, scores, color='skyblue') plt.title('直方图') plt.xlabel('科目') plt.ylabel('分数') for i in range(len(subjects)): plt.text(i, scores[i], scores[i], ha='center', va='bottom') plt.show() def zx(): plt.plot(subjects, scores, marker='o', linestyle='-', color='skyblue') plt.title('折线图') plt.xlabel('科目') plt.ylabel('分数') for i in range(len(subjects)): plt.text(subjects[i], scores[i], scores[i], ha='center', va='bottom') plt.show() def ct(): plt.rcParams['figure.dpi'] = 300 plt.rcParams['font.sans-serif'] = ['SimHei'] plt.rcParams['axes.unicode_minus'] = False plt.bar(subjects, scores, color='skyblue', edgecolor='black') plt.title('条形图') plt.xlabel('科目') plt.ylabel('分数') for i in range(len(subjects)): plt.text(i, scores[i], scores[i], ha='center', va='bottom') plt.show() def mj(): plt.rcParams['figure.dpi'] = 300 plt.rcParams['font.sans-serif'] = ['SimHei'] plt.rcParams['axes.unicode_minus'] = False x = np.arange(len(subjects)) plt.fill_between(x, scores, color='skyblue', alpha=0.7) plt.xticks(x, subjects) plt.title('面积图') plt.xlabel('科目') plt.ylabel('分数') for i in range(len(subjects)): plt.text(x[i], scores[i], scores[i], ha='center', va='bottom') plt.show() def yx(): plt.rcParams['figure.dpi'] = 300 plt.rcParams['font.sans-serif'] = ['SimHei'] plt.rcParams['axes.unicode_minus'] = False plt.pie(scores, labels=subjects, autopct='%1.1f%%', startangle=90) plt.title('各科目分数占比饼图') plt.axis('equal') plt.show() def cf(subjects, scores): new_subjects = [] new_scores = [] for i in range(len(subjects)): if subjects[i] not in new_subjects: new_subjects.

import sys import io import os import numpy as np import matplotlib as mpl import matplotlib.pyplot as plt import scapy.all as scapy from collections import defaultdict from sklearn.preprocessing import StandardScaler from sklearn.cluster import KMeans from sklearn.metrics import silhouette_score import pandas as pd from scapy.layers.inet import IP, TCP, UDP, ICMP from scapy.layers.dns import DNS, DNSRR from datetime import datetime import tkinter as tk from tkinter import filedialog, messagebox # 设置Matplotlib中文显示支持 plt.rcParams['font.sans-serif'] = ['SimHei', 'Microsoft YaHei', 'WenQuanYi Micro Hei'] plt.rcParams['axes.unicode_minus'] = False # 解决负号显示问题 # 设置系统输出编码为UTF-8 sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding='utf-8') if os.name == 'nt': # Windows系统专用设置 os.system('chcp 65001') # 切换到UTF-8代码页 # ====================== # 1. 文件选择窗口 # ====================== def select_file(title, filetypes=None, mode='open'): """创建文件/目录选择窗口""" root = tk.Tk() root.withdraw() # 隐藏主窗口 if mode == 'open': file_path = filedialog.askopenfilename( title=title, filetypes=filetypes ) elif mode == 'directory': file_path = filedialog.askdirectory(title=title) else: file_path = filedialog.asksaveasfilename( title=title, filetypes=filetypes, defaultextension=".csv" ) root.destroy() return file_path print("请选择PCAP文件...") name_pcap = select_file( "选择PCAP文件", filetypes=[("PCAP文件", "*.pcap"), ("PCAPNG文件", "*.pcapng"), ("所有文件", "*.*")], mode='open' ) if not name_pcap: print("未选择文件,程序退出") sys.exit(0) print(f"已选择文件: {name_pcap}") # ====================== # 2. PCAP文件解析与流重组 # ====================== print("正在解析PCAP文件...") # DNS解析缓存 dns_cache = {} # 流式解析大PCAP文件 def parse_large_pcap(pcap_path): flows = defaultdict(list) # 使用字典存储网络流 packet_count = 0 # 总包计数器 ip_packet_count = 0 # IP包计数器 global dns_cache # 使用全局DNS缓存 # 使用PcapReader流式读取 with scapy.PcapReader(pcap_path) as pcap_reader: for pkt in pcap_reader: packet_count += 1 # 进度显示(每10000个包) if packet_count % 10000 == 0: print(f"已处理 {packet_count} 个包...") # DNS响应解析 if DNS in pkt and pkt[DNS].qr == 1: # 只处理DNS响应 # 获取实际可用的回答记录数量 actual_ancount = len(pkt[DNS].an) if hasattr(pkt[DNS], 'an') else 0 # 使用实际数量而不是ancount字段 for i in range(min(pkt[DNS].ancount, actual_ancount)): rr = pkt[DNS].an[i] if isinstance(rr, DNSRR) and rr.type == 1: # A记录 try: ip = rr.rdata # 正确处理域名格式 domain = rr.rrname.decode('utf-8', errors='ignore').rstrip('.') # 添加到DNS缓存 if ip not in dns_cache: dns_cache[ip] = set() dns_cache[ip].add(domain) except Exception as e: # 忽略解析错误 pass # 仅处理IP层数据包 if IP in pkt: ip_packet_count += 1 ip_layer = pkt[IP] proto_num = ip_layer.proto # 获取协议号 # 初始化端口变量 src_port, dst_port = 0, 0 # 根据协议类型提取端口信息 if proto_num == 6 and TCP in pkt: # TCP协议 src_port = pkt[TCP].sport dst_port = pkt[TCP].dport elif proto_num == 17 and UDP in pkt: # UDP协议 src_port = pkt[UDP].sport dst_port = pkt[UDP].dport elif proto_num == 1 and ICMP in pkt: # ICMP协议 # 使用类型和代码代替端口 src_port = pkt[ICMP].type dst_port = pkt[ICMP].code # 创建五元组作为流的唯一标识 flow_key = (ip_layer.src, ip_layer.dst, src_port, dst_port, proto_num) flows[flow_key].append(pkt) # 将包添加到对应流 print(f"共处理 {packet_count} 个包,其中 {ip_packet_count} 个IP包") print(f"DNS缓存大小: {len(dns_cache)} 个IP到域名的映射") return flows # 解析PCAP文件 flows = parse_large_pcap(name_pcap) print(f"共识别出 {len(flows)} 个网络流") # ====================== # 3. 特征工程 # ====================== print("正在提取流量特征...") features = [] # 存储特征向量 valid_flows = [] # 存储有效流的标识 # 只初始化源时间列表 src_start_times = [] src_end_times = [] min_packets = 5 # 流的最小包数阈值 for flow_key, packets in flows.items(): packet_count = len(packets) # 过滤包数不足的流 if packet_count < min_packets: continue # 1. 基础统计特征 total_bytes = sum(len(p) for p in packets) # 总字节数 # 2. 时序特征(包到达间隔) timestamps = np.array([float(p.time) for p in packets]) if packet_count > 1: iat = np.diff(timestamps) # 包间隔 mean_iat = np.mean(iat) # 平均间隔 std_iat = np.std(iat) if len(iat) > 1 else 0 # 间隔标准差 else: mean_iat = std_iat = 0 # 3. 包长特征 pkt_lengths = np.array([len(p) for p in packets]) mean_pkt_len = np.mean(pkt_lengths) # 平均包长 std_pkt_len = np.std(pkt_lengths) if packet_count > 1 else 0 # 包长标准差 min_pkt_len = np.min(pkt_lengths) # 最小包长 max_pkt_len = np.max(pkt_lengths) # 最大包长 # 4. 方向特征(上行比例) src_ip = flow_key[0] # 流源IP dst_ip = flow_key[1] # 流目的IP # 判断数据包方向(1=上行,0=下行) directions = np.array([1 if p[IP].src == src_ip else 0 for p in packets]) up_ratio = np.mean(directions) # 上行包比例 # 源IP发送的包的时间戳 src_pkt_times = [float(p.time) for p in packets if p[IP].src == src_ip] # 计算源的起始和终止时间 src_start = min(src_pkt_times) if src_pkt_times else np.nan src_end = max(src_pkt_times) if src_pkt_times else np.nan # 添加时间信息到列表 src_start_times.append(src_start) src_end_times.append(src_end) # 添加特征向量 features.append([ packet_count, total_bytes, mean_iat, std_iat, mean_pkt_len, std_pkt_len, min_pkt_len, max_pkt_len, up_ratio ]) valid_flows.append(flow_key) # 记录有效流 if not features: # 错误处理:无有效流 print("错误:没有提取到任何有效网络流") print("可能原因:") print("1. 所有流都少于5个包(尝试减小过滤阈值)") print("2. 文件格式不兼容(尝试用Wireshark打开验证)") print("3. 没有IP流量(检查网络捕获配置)") sys.exit(1) feature_matrix = np.array(features) print(f"提取了 {len(features)} 个有效流的特征") # ====================== # 4. 数据标准化 # ====================== print("正在进行数据标准化...") scaler = StandardScaler() scaled_features = scaler.fit_transform(feature_matrix) # Z-score标准化 # ====================== # 5. 肘部法则确定最佳K值 # ====================== print("使用肘部法则确定最佳聚类数...") n_samples = scaled_features.shape[0] # 样本数量 # 动态调整K值范围 if n_samples < 5: print(f"警告:样本数量较少({n_samples}),将使用简化聚类策略") k_range = range(1, min(11, n_samples + 1)) else: k_range = range(1, 11) # 测试1-10个聚类 sse = [] # 存储SSE(误差平方和) kmeans_models = {} # 存储不同K值的模型 for k in k_range: # 跳过超过样本数的K值 if k > n_samples: print(f"跳过K={k}(超过样本数{n_samples})") continue # 训练K-means模型 kmeans = KMeans(n_clusters=k, n_init=10, random_state=42) kmeans.fit(scaled_features) sse.append(kmeans.inertia_) # 记录SSE kmeans_models[k] = kmeans # 存储模型 # 样本不足时的处理 if len(sse) < 2: optimal_k = min(2, n_samples) # 至少2类(不超过样本数) print(f"样本过少,直接设置K={optimal_k}") else: # 绘制肘部法则图 plt.figure(figsize=(10, 6)) plt.plot(list(k_range)[:len(sse)], sse, 'bo-') plt.xlabel('聚类数量 $ K$') plt.ylabel('SSE (误差平方和)') plt.title('肘部法则确定最佳聚类数') plt.grid(True) plt.savefig('elbow_method.png', dpi=300) plt.close() print("肘部法则图已保存为 elbow_method.png") # 自动检测拐点(二阶差分最小值) if len(sse) >= 3: knee_point = np.argmin(np.diff(sse, 2)) + 2 # 计算二阶差分 optimal_k = max(2, min(10, knee_point)) # 确保K在2-10之间 else: optimal_k = max(2, min(3, n_samples)) # 样本少时选择2或3 print(f"自动检测的最佳聚类数: K = {optimal_k}") # ====================== # 6. K-means聚类 # ====================== print(f"正在进行K-means聚类 (K={optimal_k})...") # 调整K值不超过样本数 if optimal_k > n_samples: optimal_k = max(1, n_samples) print(f"调整聚类数为样本数: K = {optimal_k}") # 使用已有模型或新建模型 if optimal_k in kmeans_models: kmeans = kmeans_models[optimal_k] cluster_labels = kmeans.labels_ else: kmeans = KMeans(n_clusters=optimal_k, n_init=10, random_state=42) cluster_labels = kmeans.fit_predict(scaled_features) # 计算轮廓系数(K>1时) if optimal_k > 1: silhouette_avg = silhouette_score(scaled_features, cluster_labels) print(f"轮廓系数: {silhouette_avg:.4f} (接近1表示聚类效果良好)") else: print("聚类数为1,无需计算轮廓系数") # ====================== # 7. 结果分析与保存 # ====================== # 创建结果DataFrame result_df = pd.DataFrame({ '源IP': [flow[0] for flow in valid_flows], '目的IP': [flow[1] for flow in valid_flows], '协议': [f"TCP(6)" if flow[4] == 6 else f"UDP(17)" for flow in valid_flows], '聚类标签': cluster_labels, '源起始时间': src_start_times, '源终止时间': src_end_times }) # 添加域名信息 def get_domains(ip): """获取IP对应的域名列表""" domains = dns_cache.get(ip, set()) return ", ".join(domains) if domains else "N/A" result_df['目的域名'] = result_df['目的IP'].apply(get_domains) # 添加包数量和总字节数 result_df['包数量'] = feature_matrix[:, 0] result_df['总字节数'] = feature_matrix[:, 1] # 时间格式化函数 def format_timestamp(ts): """将时间戳格式化为可读字符串""" if np.isnan(ts): return "N/A" return datetime.fromtimestamp(ts).strftime("%Y-%m-%d %H:%M:%S.%f")[:-3] # 保留毫秒 # 应用时间格式化 result_df['源起始时间'] = result_df['源起始时间'].apply(format_timestamp) result_df['源终止时间'] = result_df['源终止时间'].apply(format_timestamp) # 选择需要的列 final_columns = [ '源IP', '目的IP', '协议', '源起始时间', '源终止时间', '目的域名', '包数量', '总字节数' ] result_df = result_df[final_columns] # 保存结果 - 让用户选择保存位置 print("请选择保存结果文件的文件夹...") output_dir = select_file("选择结果保存文件夹", mode='directory') if not output_dir: print("未选择保存目录,使用当前目录") output_dir = os.getcwd() # 创建完整文件路径 output_filename = "result.csv" output_path = os.path.join(output_dir, output_filename) # 检查文件是否存在,避免覆盖 counter = 1 while os.path.exists(output_path): new_filename = f"result_{counter}.csv" output_path = os.path.join(output_dir, new_filename) counter += 1 # 保存文件 try: result_df.to_csv(output_path, index=False, encoding='utf_8_sig') print(f"聚类结果已保存为 {output_path}") # 显示保存成功消息 root = tk.Tk() root.withdraw() messagebox.showinfo("保存成功", f"结果文件已保存至:\n{output_path}") root.destroy() except Exception as e: print(f"保存文件时出错: {e}") # 尝试在临时目录保存 temp_dir = os.path.join(os.environ.get('TEMP', ''), 'netflow_clustering') os.makedirs(temp_dir, exist_ok=True) temp_path = os.path.join(temp_dir, output_filename) try: result_df.to_csv(temp_path, index=False, encoding='utf_8_sig') print(f"聚类结果已保存为 {temp_path}") # 显示保存位置消息 root = tk.Tk() root.withdraw() messagebox.showinfo("保存成功", f"结果文件已保存至临时目录:\n{temp_path}") root.destroy() except Exception as e2: print(f"临时目录保存失败: {e2}") print("无法保存结果文件") # 最终输出 print("\n聚类完成!") 添加功能 现在我需要根据每个用户单独分类,定义为一个用户的标准误为IP是“10.”开头的,例如10.194.145.237 。然后去源IP和目的IP中去找含有10.194.145.237的数据,整合到一个单独的Excel文件进行输出。让用户能够自定义文件的存储路径

import numpy as np import matplotlib.pyplot as plt import pandas as pd import tkinter as tk from tkinter import ttk, filedialog, messagebox from PIL import Image, ImageDraw import cv2 import os import csv from sklearn.datasets import load_digits from sklearn.model_selection import train_test_split from sklearn.svm import SVC from sklearn.tree import DecisionTreeClassifier from sklearn.ensemble import RandomForestClassifier from sklearn.neural_network import MLPClassifier from sklearn.neighbors import KNeighborsClassifier from sklearn.naive_bayes import GaussianNB from sklearn.metrics import accuracy_score from sklearn.preprocessing import StandardScaler # 设置中文字体和负号显示 plt.rcParams["font.family"] = ["SimHei", "Microsoft YaHei"] plt.rcParams["axes.unicode_minus"] = False # 尝试导入XGBoost和LightGBM(使用延迟导入减少启动时间) XGB_INSTALLED = False LGB_INSTALLED = False try: import xgboost as xgb XGB_INSTALLED = True except ImportError: print("警告: 未安装XGBoost库,无法使用XGBoost模型") try: import lightgbm as lgb LGB_INSTALLED = True except ImportError: print("警告: 未安装LightGBM库,无法使用LightGBM模型") # 定义模型元数据常量(优化参数) MODEL_METADATA = { 'svm': ('支持向量机(SVM)', SVC, StandardScaler, {'probability': True, 'random_state': 42}), 'dt': ('决策树(DT)', DecisionTreeClassifier, None, {'random_state': 42}), 'rf': ('随机森林(RF)', RandomForestClassifier, None, {'n_estimators': 100, 'random_state': 42}), 'mlp': ('多层感知机(MLP)', MLPClassifier, StandardScaler, {'hidden_layer_sizes': (100, 50), 'max_iter': 500, 'random_state': 42}), 'knn': ('K最近邻(KNN)', KNeighborsClassifier, StandardScaler, {'n_neighbors': 5, 'weights': 'distance'}), 'nb': ('高斯朴素贝叶斯(NB)', GaussianNB, None, {}), } # 添加可选模型 if XGB_INSTALLED: MODEL_METADATA['xgb'] = ('XGBoost(XGB)', xgb.XGBClassifier, None, {'objective': 'multi:softmax', 'random_state': 42}) if LGB_INSTALLED: MODEL_METADATA['lgb'] = ('LightGBM(LGB)', lgb.LGBMClassifier, None, { 'objective': 'multiclass', 'random_state': 42, 'num_class': 10, 'max_depth': 5, 'min_child_samples': 10, 'learning_rate': 0.1, 'force_col_wise': True }) class ModelFactory: @staticmethod def get_split_data(digits_dataset): """数据集划分""" X, y = digits_dataset.data, digits_dataset.target return train_test_split(X, y, test_size=0.3, random_state=42) @classmethod def create_model(cls, model_type): """创建模型和数据标准化器""" if model_type not in MODEL_METADATA: raise ValueError(f"未知模型类型: {model_type}") name, model_cls, scaler_cls, params = MODEL_METADATA[model_type] if not model_cls: raise ImportError(f"{name}模型依赖库未安装") model = model_cls(**params) scaler = scaler_cls() if scaler_cls else None return model, scaler @staticmethod def train_model(model, X_train, y_train, scaler=None, model_type=None): """训练模型""" if scaler: X_train = scaler.fit_transform(X_train) if model_type == 'lgb' and isinstance(X_train, np.ndarray): X_train = pd.DataFrame(X_train) model.fit(X_train, y_train) return model @staticmethod def evaluate_model(model, X_test, y_test, scaler=None, model_type=None): """评估模型""" if scaler: X_test = scaler.transform(X_test) if model_type == 'lgb' and isinstance(X_test, np.ndarray) and hasattr(model, 'feature_name_'): X_test = pd.DataFrame(X_test, columns=model.feature_name_) y_pred = model.predict(X_test) return accuracy_score(y_test, y_pred) @classmethod def train_and_evaluate(cls, model_type, X_train, y_train, X_test, y_test): """训练并评估模型""" try: model, scaler = cls.create_model(model_type) model = cls.train_model(model, X_train, y_train, scaler, model_type) accuracy = cls.evaluate_model(model, X_test, y_test, scaler, model_type) return model, scaler, accuracy except Exception as e: print(f"模型 {model_type} 训练/评估错误: {str(e)}") raise @classmethod def evaluate_all_models(cls, digits_dataset): """评估所有可用模型""" print("\n=== 模型评估 ===") X_train, X_test, y_train, y_test = cls.get_split_data(digits_dataset) results = [] for model_type in MODEL_METADATA: name = MODEL_METADATA[model_type][0] print(f"评估模型: {name} ({model_type})") if not MODEL_METADATA[model_type][1]: results.append({"模型名称": name, "准确率": "N/A"}) continue try: _, _, accuracy = cls.train_and_evaluate( model_type, X_train, y_train, X_test, y_test ) results.append({"模型名称": name, "准确率": f"{accuracy:.4f}"}) except Exception as e: results.append({"模型名称": name, "准确率": f"错误: {str(e)}"}) # 按准确率排序 results.sort( key=lambda x: float(x["准确率"]) if isinstance(x["准确率"], str) and x["准确率"].replace('.', '', 1).isdigit() else -1, reverse=True ) print(pd.DataFrame(results)) return results class HandwritingBoard: CANVAS_SIZE = 300 # 固定画布尺寸 BRUSH_SIZE = 12 # 画笔大小 def __init__(self, root, model_factory, digits): self.root = root self.root.title("手写数字识别系统") self.root.geometry("900x650") self.model_factory = model_factory self.digits = digits self.model_cache = {} self.current_model = None self.scaler = None self.current_model_type = None self.has_drawn = False self.custom_data = [] self.drawing = False self.last_x = self.last_y = 0 # 自定义数据目录 self.data_dir = "custom_digits_data" os.makedirs(self.data_dir, exist_ok=True) # 初始化画布 self.image = Image.new("L", (self.CANVAS_SIZE, self.CANVAS_SIZE), 255) self.draw_obj = ImageDraw.Draw(self.image) self.create_widgets() self.init_default_model() def create_widgets(self): """创建界面组件""" main_frame = tk.Frame(self.root) main_frame.pack(fill=tk.BOTH, expand=True, padx=10, pady=10) # 左侧绘图区域 left_frame = tk.Frame(main_frame) left_frame.pack(side=tk.LEFT, fill=tk.BOTH, padx=(0, 10)) canvas_frame = tk.LabelFrame(left_frame, text="绘制区域", font=("Arial", 10)) canvas_frame.pack(fill=tk.BOTH, expand=True) self.canvas = tk.Canvas(canvas_frame, bg="white", width=self.CANVAS_SIZE, height=self.CANVAS_SIZE) self.canvas.pack(padx=5, pady=5) self.canvas.bind("<Button-1>", self.start_draw) self.canvas.bind("<B1-Motion>", self.draw) self.canvas.bind("<ButtonRelease-1>", self.stop_draw) # 添加绘制提示 self.canvas.create_text( self.CANVAS_SIZE / 2, self.CANVAS_SIZE / 2, text="绘制数字", fill="gray", font=("Arial", 16) ) # 绘图控制按钮 btn_frame = tk.Frame(left_frame) btn_frame.pack(fill=tk.X, pady=(5, 0)) tk.Button(btn_frame, text="识别", command=self.recognize, width=8).pack(side=tk.LEFT, padx=2) tk.Button(btn_frame, text="清除", command=self.clear_canvas, width=8).pack(side=tk.LEFT, padx=2) tk.Button(btn_frame, text="样本", command=self.show_samples, width=8).pack(side=tk.LEFT, padx=2) # 右侧控制面板 right_frame = tk.Frame(main_frame) right_frame.pack(side=tk.RIGHT, fill=tk.BOTH, expand=True) # 模型选择 model_frame = tk.LabelFrame(right_frame, text="模型选择", font=("Arial", 10)) model_frame.pack(fill=tk.X, pady=(0, 10)) tk.Label(model_frame, text="选择模型:").pack(side=tk.LEFT, padx=5) self.available_models = [] for model_type, (name, _, _, _) in MODEL_METADATA.items(): if MODEL_METADATA[model_type][1]: self.available_models.append((model_type, name)) self.model_var = tk.StringVar() self.model_combobox = ttk.Combobox( model_frame, textvariable=self.model_var, values=[name for _, name in self.available_models], state="readonly", width=20 ) self.model_combobox.current(0) self.model_combobox.pack(side=tk.LEFT, padx=5) self.model_combobox.bind("<<ComboboxSelected>>", self.on_model_select) # 当前模型信息 self.model_label = tk.Label( model_frame, text="", font=("Arial", 10), relief=tk.SUNKEN, padx=5, pady=2, width=30 ) self.model_label.pack(side=tk.RIGHT, padx=5) # 识别结果 result_frame = tk.LabelFrame(right_frame, text="识别结果", font=("Arial", 10)) result_frame.pack(fill=tk.X, pady=(0, 10)) self.result_label = tk.Label( result_frame, text="请绘制数字", font=("Arial", 24), pady=10 ) self.result_label.pack() self.prob_label = tk.Label( result_frame, text="", font=("Arial", 10) ) self.prob_label.pack() # 置信度可视化 confidence_frame = tk.LabelFrame(right_frame, text="识别置信度", font=("Arial", 10)) confidence_frame.pack(fill=tk.X, pady=(0, 10)) self.confidence_canvas = tk.Canvas( confidence_frame, bg="white", height=40 ) self.confidence_canvas.pack(fill=tk.X, padx=5, pady=5) # 候选数字 candidates_frame = tk.LabelFrame(right_frame, text="可能的数字", font=("Arial", 10)) candidates_frame.pack(fill=tk.X, pady=(0, 10)) columns = ("数字", "概率") self.candidates_tree = ttk.Treeview( candidates_frame, columns=columns, show="headings", height=4 ) for col in columns: self.candidates_tree.heading(col, text=col) self.candidates_tree.column(col, width=70, anchor=tk.CENTER) scrollbar = ttk.Scrollbar( candidates_frame, orient=tk.VERTICAL, command=self.candidates_tree.yview ) self.candidates_tree.configure(yscroll=scrollbar.set) self.candidates_tree.pack(side=tk.LEFT, fill=tk.BOTH, expand=True) scrollbar.pack(side=tk.RIGHT, fill=tk.Y) # 模型性能 performance_frame = tk.LabelFrame(right_frame, text="模型性能对比", font=("Arial", 10)) performance_frame.pack(fill=tk.BOTH, expand=True) columns = ("模型名称", "准确率") self.performance_tree = ttk.Treeview( performance_frame, columns=columns, show="headings", height=8 ) for col in columns: self.performance_tree.heading(col, text=col) self.performance_tree.column(col, width=100, anchor=tk.CENTER) scrollbar = ttk.Scrollbar( performance_frame, orient=tk.VERTICAL, command=self.performance_tree.yview ) self.performance_tree.configure(yscroll=scrollbar.set) self.performance_tree.pack(side=tk.LEFT, fill=tk.BOTH, expand=True) scrollbar.pack(side=tk.RIGHT, fill=tk.Y) # 训练集管理 train_frame = tk.Frame(right_frame) train_frame.pack(fill=tk.X, pady=(10, 0)) tk.Button( train_frame, text="保存为训练样本", command=self.save_as_training_sample, width=15 ).pack(side=tk.LEFT, padx=2) tk.Button( train_frame, text="保存全部训练集", command=self.save_all_training_data, width=15 ).pack(side=tk.LEFT, padx=2) tk.Button( train_frame, text="加载训练集", command=self.load_training_data, width=15 ).pack(side=tk.LEFT, padx=2) tk.Button( train_frame, text="性能图表", command=self.show_performance_chart, width=15 ).pack(side=tk.LEFT, padx=2) # 状态信息 self.status_var = tk.StringVar(value="就绪") status_bar = tk.Label( self.root, textvariable=self.status_var, bd=1, relief=tk.SUNKEN, anchor=tk.W, font=("Arial", 9) ) status_bar.pack(side=tk.BOTTOM, fill=tk.X) def start_draw(self, event): """开始绘制""" self.drawing = True self.last_x, self.last_y = event.x, event.y def draw(self, event): """绘制""" if not self.drawing: return x, y = event.x, event.y # 在画布上绘制 self.canvas.create_line( self.last_x, self.last_y, x, y, fill="black", width=self.BRUSH_SIZE, capstyle=tk.ROUND, smooth=True ) # 在图像上绘制 self.draw_obj.line( [self.last_x, self.last_y, x, y], fill=0, width=self.BRUSH_SIZE ) self.last_x, self.last_y = x, y def stop_draw(self, event): """停止绘制""" self.drawing = False self.has_drawn = True self.status_var.set("已绘制数字,点击'识别'进行识别") def clear_canvas(self): """清除画布""" self.canvas.delete("all") self.image = Image.new("L", (self.CANVAS_SIZE, self.CANVAS_SIZE), 255) self.draw_obj = ImageDraw.Draw(self.image) # 添加绘制提示 self.canvas.create_text( self.CANVAS_SIZE / 2, self.CANVAS_SIZE / 2, text="绘制数字", fill="gray", font=("Arial", 16) ) self.result_label.config(text="请绘制数字") self.prob_label.config(text="") self.clear_confidence_display() self.has_drawn = False self.status_var.set("画布已清除") def clear_confidence_display(self): """清除置信度显示""" self.confidence_canvas.delete("all") self.confidence_canvas.create_text( 150, 20, text="识别后显示置信度", fill="gray", font=("Arial", 10) ) for item in self.candidates_tree.get_children(): self.candidates_tree.delete(item) def preprocess_image(self): """预处理手写数字图像""" img_array = np.array(self.image) # 高斯模糊降噪 img_array = cv2.GaussianBlur(img_array, (5, 5), 0) # 二值化 _, img_array = cv2.threshold(img_array, 127, 255, cv2.THRESH_BINARY_INV) # 轮廓检测 contours, _ = cv2.findContours(img_array, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE) if not contours: self.status_var.set("未检测到有效数字,请重新绘制") return None # 找到最大轮廓 c = max(contours, key=cv2.contourArea) x, y, w, h = cv2.boundingRect(c) # 提取数字区域 digit = img_array[y:y+h, x:x+w] # 填充为正方形 size = max(w, h) padded = np.ones((size, size), dtype=np.uint8) * 255 offset_x = (size - w) // 2 offset_y = (size - h) // 2 padded[offset_y:offset_y+h, offset_x:offset_x+w] = digit # 缩放为8x8 resized = cv2.resize(padded, (8, 8), interpolation=cv2.INTER_AREA) # 归一化 normalized = 16 - (resized / 255 * 16).astype(np.uint8) return normalized.flatten() def recognize(self): """识别手写数字""" if not self.has_drawn: self.status_var.set("请先绘制数字再识别") return if self.current_model is None: self.status_var.set("模型未加载,请选择模型") return # 预处理图像 img_array = self.preprocess_image() if img_array is None: return img_input = img_array.reshape(1, -1) try: # 标准化 if self.scaler: img_input = self.scaler.transform(img_input) # LightGBM特殊处理 if self.current_model_type == 'lgb' and hasattr(self.current_model, 'feature_name_'): img_input = pd.DataFrame(img_input, columns=self.current_model.feature_name_) # 预测 pred = self.current_model.predict(img_input)[0] self.result_label.config(text=f"识别结果: {pred}") # 概率预测 if hasattr(self.current_model, 'predict_proba'): probs = self.current_model.predict_proba(img_input)[0] confidence = probs[pred] # 更新UI self.prob_label.config(text=f"置信度: {confidence:.2%}") self.update_confidence_display(confidence) # 显示候选数字 top3 = sorted(enumerate(probs), key=lambda x: -x[1])[:3] self.update_candidates_display(top3) else: self.prob_label.config(text="该模型不支持概率输出") self.clear_confidence_display() self.status_var.set(f"识别完成: 数字 {pred}") except Exception as e: self.status_var.set(f"识别错误: {str(e)}") self.clear_confidence_display() def update_confidence_display(self, confidence): """更新置信度可视化""" self.confidence_canvas.delete("all") # 画布尺寸 canvas_width = self.confidence_canvas.winfo_width() or 300 # 绘制背景 self.confidence_canvas.create_rectangle( 10, 10, canvas_width - 10, 30, fill="#f0f0f0", outline="#cccccc" ) # 绘制置信度条 bar_width = int((canvas_width - 20) * confidence) color = self.get_confidence_color(confidence) self.confidence_canvas.create_rectangle( 10, 10, 10 + bar_width, 30, fill=color, outline="" ) # 绘制文本 self.confidence_canvas.create_text( canvas_width / 2, 20, text=f"{confidence:.1%}", font=("Arial", 10, "bold") ) # 绘制刻度 for i in range(0, 11): x_pos = 10 + i * (canvas_width - 20) / 10 self.confidence_canvas.create_line(x_pos, 30, x_pos, 35, width=1) if i % 2 == 0: self.confidence_canvas.create_text(x_pos, 45, text=f"{i*10}%", font=("Arial", 8)) def get_confidence_color(self, confidence): """根据置信度获取颜色""" if confidence >= 0.9: return "#4CAF50" # 绿色 elif confidence >= 0.7: return "#FFC107" # 黄色 else: return "#F44336" # 红色 def update_candidates_display(self, candidates): """更新候选数字显示""" # 清空现有项 for item in self.candidates_tree.get_children(): self.candidates_tree.delete(item) # 添加新项 for digit, prob in candidates: self.candidates_tree.insert( "", tk.END, values=(digit, f"{prob:.2%}") ) def show_samples(self): """显示样本图像""" plt.figure(figsize=(10, 4)) for i in range(10): plt.subplot(2, 5, i+1) sample_idx = np.where(self.digits.target == i)[0][0] plt.imshow(self.digits.images[sample_idx], cmap="gray") plt.title(f"数字 {i}", fontsize=9) plt.axis("off") plt.tight_layout() plt.show() def on_model_select(self, event): """模型选择事件处理""" selected_name = self.model_var.get() model_type = next( (k for k, v in self.available_models if v == selected_name), None ) if model_type: self.change_model(model_type) def change_model(self, model_type): """切换模型""" model_name = MODEL_METADATA[model_type][0] # 从缓存加载 if model_type in self.model_cache: self.current_model, self.scaler, accuracy, self.current_model_type = self.model_cache[model_type] self.model_label.config(text=f"{model_name} (准确率:{accuracy:.4f})") self.status_var.set(f"已加载模型: {model_name}") return self.status_var.set(f"正在加载模型: {model_name}...") self.root.update() # 更新UI显示状态 try: X_train, X_test, y_train, y_test = self.model_factory.get_split_data(self.digits) self.current_model, self.scaler, accuracy = self.model_factory.train_and_evaluate( model_type, X_train, y_train, X_test, y_test ) self.current_model_type = model_type self.model_cache[model_type] = (self.current_model, self.scaler, accuracy, self.current_model_type) self.model_label.config(text=f"{model_name} (准确率:{accuracy:.4f})") self.status_var.set(f"模型加载完成: {model_name}, 准确率: {accuracy:.4f}") self.clear_canvas() # 更新性能表格 self.load_performance_data() except Exception as e: self.status_var.set(f"模型加载失败: {str(e)}") self.model_label.config(text="模型加载失败") def init_default_model(self): """初始化默认模型""" self.model_var.set(self.available_models[0][1]) self.change_model(self.available_models[0][0]) def load_performance_data(self): """加载性能数据""" results = self.model_factory.evaluate_all_models(self.digits) # 清空表格 for item in self.performance_tree.get_children(): self.performance_tree.delete(item) # 添加数据 for i, result in enumerate(results): tag = "highlight" if i == 0 else "" self.performance_tree.insert( "", tk.END, values=(result["模型名称"], result["准确率"]), tags=(tag,) ) self.performance_tree.tag_configure("highlight", background="#e6f7ff") def show_performance_chart(self): """显示性能图表""" results = self.model_factory.evaluate_all_models(self.digits) # 提取有效结果 valid_results = [] for result in results: try: accuracy = float(result["准确率"]) valid_results.append((result["模型名称"], accuracy)) except ValueError: continue if not valid_results: messagebox.showinfo("提示", "没有可用的性能数据") return # 排序 valid_results.sort(key=lambda x: x[1], reverse=True) models, accuracies = zip(*valid_results) # 创建图表 plt.figure(figsize=(10, 5)) bars = plt.barh(models, accuracies, color='#2196F3') plt.xlabel('准确率', fontsize=10) plt.ylabel('模型', fontsize=10) plt.title('模型性能对比', fontsize=12) plt.xlim(0, 1.05) # 添加数值标签 for bar in bars: width = bar.get_width() plt.text( width + 0.01, bar.get_y() + bar.get_height()/2, f'{width:.4f}', ha='left', va='center', fontsize=8 ) plt.tight_layout() plt.show() def save_as_training_sample(self): """保存为训练样本""" if not self.has_drawn: self.status_var.set("请先绘制数字再保存") return img_array = self.preprocess_image() if img_array is None: return # 弹出标签输入窗口 label_window = tk.Toplevel(self.root) label_window.title("输入标签") label_window.geometry("300x120") label_window.transient(self.root) label_window.grab_set() tk.Label( label_window, text="请输入数字标签 (0-9):", font=("Arial", 10) ).pack(pady=10) entry = tk.Entry(label_window, font=("Arial", 12), width=5) entry.pack(pady=5) entry.focus_set() def save_with_label(): try: label = int(entry.get()) if label < 0 or label > 9: raise ValueError("标签必须是0-9的数字") self.custom_data.append((img_array.tolist(), label)) self.status_var.set(f"已保存数字 {label} (共 {len(self.custom_data)} 个样本)") label_window.destroy() except ValueError as e: self.status_var.set(f"保存错误: {str(e)}") tk.Button( label_window, text="保存", command=save_with_label, width=10 ).pack(pady=5) def save_all_training_data(self): """保存全部训练数据""" if not self.custom_data: self.status_var.set("没有训练数据可保存") return file_path = filedialog.asksaveasfilename( defaultextension=".csv", filetypes=[("CSV文件", "*.csv")], initialfile="custom_digits.csv", title="保存训练集" ) if not file_path: return try: with open(file_path, 'w', newline='', encoding='utf-8') as f: writer = csv.writer(f) writer.writerow([f'pixel{i}' for i in range(64)] + ['label']) for img_data, label in self.custom_data: writer.writerow(img_data + [label]) self.status_var.set(f"已保存 {len(self.custom_data)} 个样本到 {os.path.basename(file_path)}") except Exception as e: self.status_var.set(f"保存失败: {str(e)}") def load_training_data(self): """加载训练数据""" file_path = filedialog.askopenfilename( filetypes=[("CSV文件", "*.csv")], title="加载训练集" ) if not file_path: return try: self.custom_data = [] with open(file_path, 'r', newline='', encoding='utf-8') as f: reader = csv.reader(f) next(reader) # 跳过标题 for row in reader: if len(row) != 65: continue img_data = [float(pixel) for pixel in row[:64]] label = int(row[64]) self.custom_data.append((img_data, label)) self.status_var.set(f"已加载 {len(self.custom_data)} 个样本") except Exception as e: self.status_var.set(f"加载失败: {str(e)}") def run(self): """运行应用""" self.root.mainloop() if __name__ == "__main__": digits = load_digits() root = tk.Tk() app = HandwritingBoard(root, ModelFactory, digits) app.run() 根据这个代码重新生成合理代码,里面的组件存在遮挡情况

import sys import io import os import numpy as np import matplotlib as mpl import matplotlib.pyplot as plt import scapy.all as scapy from collections import defaultdict from sklearn.preprocessing import StandardScaler from sklearn.cluster import KMeans from sklearn.metrics import silhouette_score import pandas as pd import tkinter as tk from tkinter import filedialog, messagebox from scapy.layers.inet import IP, TCP, UDP, ICMP from scapy.layers.dns import DNS, DNSRR from datetime import datetime import time from sklearn.decomposition import PCA # 设置Matplotlib中文显示支持 plt.rcParams['font.sans-serif'] = ['SimHei', 'Microsoft YaHei', 'WenQuanYi Micro Hei'] plt.rcParams['axes.unicode_minus'] = False # 解决负号显示问题 # 设置系统输出编码为UTF-8 sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding='utf-8') if os.name == 'nt': # Windows系统专用设置 os.system('chcp 65001') # 切换到UTF-8代码页 # ====================== # 文件选择对话框 # ====================== def select_files(): """创建文件选择对话框""" root = tk.Tk() root.withdraw() # 隐藏主窗口 # 选择PCAP文件 pcap_path = filedialog.askopenfilename( title="选择PCAP文件", filetypes=[("PCAP文件", "*.pcap"), ("PCAPNG文件", "*.pcapng"), ("所有文件", "*.*")] ) if not pcap_path: print("未选择PCAP文件,程序退出") sys.exit(0) # 选择输出目录 output_dir = filedialog.askdirectory(title="选择结果保存位置") if not output_dir: print("未选择输出目录,程序退出") sys.exit(0) return pcap_path, output_dir # ====================== # 1. PCAP文件解析与流重组 # ====================== print("请选择PCAP文件和输出位置...") pcap_path, output_dir = select_files() print(f"解析文件: {pcap_path}") print(f"保存位置: {output_dir}") # 新增:DNS解析缓存 dns_cache = {} # 流式解析大PCAP文件,避免内存溢出 def parse_large_pcap(pcap_path): flows = defaultdict(list) # 使用字典存储网络流 packet_count = 0 # 总包计数器 ip_packet_count = 0 # IP包计数器 global dns_cache # 使用全局DNS缓存 # 使用PcapReader流式读取,避免一次性加载大文件 with scapy.PcapReader(pcap_path) as pcap_reader: for pkt in pcap_reader: packet_count += 1 # 进度显示(每10000个包) if packet_count % 10000 == 0: print(f"已处理 {packet_count} 个包...") # 新增:DNS响应解析 - 修复索引越界问题 if DNS in pkt and pkt[DNS].qr == 1: # 只处理DNS响应 # 获取实际可用的回答记录数量 actual_ancount = len(pkt[DNS].an) if hasattr(pkt[DNS], 'an') else 0 # 使用实际数量而不是ancount字段 for i in range(min(pkt[DNS].ancount, actual_ancount)): rr = pkt[DNS].an[i] if isinstance(rr, DNSRR) and rr.type == 1: # A记录 try: ip = rr.rdata # 正确处理域名格式 domain = rr.rrname.decode('utf-8', errors='ignore').rstrip('.') # 添加到DNS缓存 if ip not in dns_cache: dns_cache[ip] = set() dns_cache[ip].add(domain) except Exception as e: # 忽略解析错误 pass # 仅处理IP层数据包 if IP in pkt: ip_packet_count += 1 ip_layer = pkt[IP] proto_num = ip_layer.proto # 获取协议号 # 初始化端口变量 src_port, dst_port = 0, 0 # 根据协议类型提取端口信息 if proto_num == 6 and TCP in pkt: # TCP协议 src_port = pkt[TCP].sport dst_port = pkt[TCP].dport elif proto_num == 17 and UDP in pkt: # UDP协议 src_port = pkt[UDP].sport dst_port = pkt[UDP].dport elif proto_num == 1 and ICMP in pkt: # ICMP协议 # 使用类型和代码代替端口 src_port = pkt[ICMP].type dst_port = pkt[ICMP].code # 创建五元组作为流的唯一标识 flow_key = (ip_layer.src, ip_layer.dst, src_port, dst_port, proto_num) flows[flow_key].append(pkt) # 将包添加到对应流 print(f"共处理 {packet_count} 个包,其中 {ip_packet_count} 个IP包") print(f"DNS缓存大小: {len(dns_cache)} 个IP到域名的映射") return flows # 解析PCAP文件 flows = parse_large_pcap(pcap_path) print(f"共识别出 {len(flows)} 个网络流") # ====================== # 2. 特征工程 # ====================== print("正在提取流量特征...") features = [] # 存储特征向量 valid_flows = [] # 存储有效流的标识 # 只初始化源时间列表 src_start_times = [] src_end_times = [] # 特征名称列表 feature_names = [ '包数量', '总字节数', '平均包间隔', '包间隔标准差', '平均包长', '包长标准差', '最小包长', '最大包长', '上行比例' ] min_packets = 5 # 流的最小包数阈值 for flow_key, packets in flows.items(): packet_count = len(packets) # 过滤包数不足的流 if packet_count < min_packets: continue # 1. 基础统计特征 total_bytes = sum(len(p) for p in packets) # 总字节数 # 2. 时序特征(包到达间隔) timestamps = np.array([float(p.time) for p in packets]) if packet_count > 1: iat = np.diff(timestamps) # 包间隔 mean_iat = np.mean(iat) # 平均间隔 std_iat = np.std(iat) if len(iat) > 1 else 0 # 间隔标准差 else: mean_iat = std_iat = 0 # 3. 包长特征 pkt_lengths = np.array([len(p) for p in packets]) mean_pkt_len = np.mean(pkt_lengths) # 平均包长 std_pkt_len = np.std(pkt_lengths) if packet_count > 1 else 0 # 包长标准差 min_pkt_len = np.min(pkt_lengths) # 最小包长 max_pkt_len = np.max(pkt_lengths) # 最大包长 # 4. 方向特征(上行比例) src_ip = flow_key[0] # 流源IP dst_ip = flow_key[1] # 流目的IP # 判断数据包方向(1=上行,0=下行) directions = np.array([1 if p[IP].src == src_ip else 0 for p in packets]) up_ratio = np.mean(directions) # 上行包比例 # 源IP发送的包的时间戳 src_pkt_times = [float(p.time) for p in packets if p[IP].src == src_ip] # 计算源的起始和终止时间 src_start = min(src_pkt_times) if src_pkt_times else np.nan src_end = max(src_pkt_times) if src_pkt_times else np.nan # 添加时间信息到列表 src_start_times.append(src_start) src_end_times.append(src_end) # 添加特征向量 features.append([ packet_count, total_bytes, mean_iat, std_iat, mean_pkt_len, std_pkt_len, min_pkt_len, max_pkt_len, up_ratio ]) valid_flows.append(flow_key) # 记录有效流 if not features: # 错误处理:无有效流 print("错误:没有提取到任何有效网络流") print("可能原因:") print("1. 所有流都少于5个包(尝试减小过滤阈值)") print("2. 文件格式不兼容(尝试用Wireshark打开验证)") print("3. 没有IP流量(检查网络捕获配置)") sys.exit(1) feature_matrix = np.array(features) print(f"提取了 {len(features)} 个有效流的 {len(feature_names)} 维特征") # ====================== # 3. 数据标准化 # ====================== print("正在进行数据标准化...") scaler = StandardScaler() scaled_features = scaler.fit_transform(feature_matrix) # Z-score标准化 # ====================== # 4. 肘部法则确定最佳K值 # ====================== print("使用肘部法则确定最佳聚类数...") n_samples = scaled_features.shape[0] # 样本数量 # 动态调整K值范围 if n_samples < 5: print(f"警告:样本数量较少({n_samples}),将使用简化聚类策略") k_range = range(1, min(11, n_samples + 1)) # K上限不超过样本数 else: k_range = range(1, 11) # 测试1-10个聚类 sse = [] # 存储SSE(误差平方和) kmeans_models = {} # 存储不同K值的模型 for k in k_range: # 跳过超过样本数的K值 if k > n_samples: print(f"跳过K={k}(超过样本数{n_samples})") continue # 训练K-means模型 kmeans = KMeans(n_clusters=k, n_init=10, random_state=42) kmeans.fit(scaled_features) sse.append(kmeans.inertia_) # 记录SSE kmeans_models[k] = kmeans # 存储模型 # 样本不足时的处理 if len(sse) < 2: optimal_k = min(2, n_samples) # 至少2类(不超过样本数) print(f"样本过少,直接设置K={optimal_k}") else: # 绘制肘部法则图 plt.figure(figsize=(10, 6)) plt.plot(list(k_range)[:len(sse)], sse, 'bo-') plt.xlabel('聚类数量 $ K$') plt.ylabel('SSE (误差平方和)') plt.title('肘部法则确定最佳聚类数') plt.grid(True) elbow_path = os.path.join(output_dir, 'elbow_method.png') plt.savefig(elbow_path, dpi=300) plt.close() print(f"肘部法则图已保存为 {elbow_path}") # 自动检测拐点(二阶差分最小值) if len(sse) >= 3: knee_point = np.argmin(np.diff(sse, 2)) + 2 # 计算二阶差分 optimal_k = max(2, min(10, knee_point)) # 确保K在2-10之间 else: optimal_k = max(2, min(3, n_samples)) # 样本少时选择2或3 print(f"自动检测的最佳聚类数: K = {optimal_k}") # ====================== # 5. K-means聚类 # ====================== print(f"正在进行K-means聚类 (K={optimal_k})...") # 调整K值不超过样本数 if optimal_k > n_samples: optimal_k = max(1, n_samples) print(f"调整聚类数为样本数: K = {optimal_k}") # 使用已有模型或新建模型 if optimal_k in kmeans_models: kmeans = kmeans_models[optimal_k] cluster_labels = kmeans.labels_ else: kmeans = KMeans(n_clusters=optimal_k, n_init=10, random_state=42) cluster_labels = kmeans.fit_predict(scaled_features) # 计算轮廓系数(K>1时) if optimal_k > 1: silhouette_avg = silhouette_score(scaled_features, cluster_labels) print(f"轮廓系数: {silhouette_avg:.4f} (接近1表示聚类效果良好)") else: print("聚类数为1,无需计算轮廓系数") # ====================== # 6. 结果分析与可视化(新增域名显示) # ====================== # 创建结果DataFrame result_df = pd.DataFrame({ '源IP': [flow[0] for flow in valid_flows], '目的IP': [flow[1] for flow in valid_flows], '协议': [f"TCP(6)" if flow[4] == 6 else f"UDP(17)" for flow in valid_flows], '聚类标签': cluster_labels, '源起始时间': src_start_times, '源终止时间': src_end_times, '包数量': feature_matrix[:, 0], '总字节数': feature_matrix[:, 1] }) # 新增:添加域名信息 def get_domains(ip): """获取IP对应的域名列表""" domains = dns_cache.get(ip, set()) return ", ".join(domains) if domains else "N/A" result_df['目的域名'] = result_df['目的IP'].apply(get_domains) # 新增:时间格式化函数 def format_timestamp(ts): """将时间戳格式化为可读字符串""" if np.isnan(ts): return "N/A" return datetime.fromtimestamp(ts).strftime("%Y-%m-%d %H:%M:%S.%f")[:-3] # 保留毫秒 # 应用时间格式化 result_df['源起始时间'] = result_df['源起始时间'].apply(format_timestamp) result_df['源终止时间'] = result_df['源终止时间'].apply(format_timestamp) # 只保留需要的列 required_columns = ['源IP', '目的IP', '协议', '源起始时间', '源终止时间', '目的域名', '包数量', '总字节数'] result_df = result_df[required_columns] # 生成结果文件名(带时间戳) timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") output_filename = f'cluster_result_{timestamp}.csv' output_path = os.path.join(output_dir, output_filename) try: result_df.to_csv(output_path, index=False, encoding='utf_8_sig') print(f"聚类结果已保存为 {output_path}") except Exception as e: print(f"保存结果文件失败: {str(e)}") # 尝试临时目录作为备选 temp_dir = os.path.join(os.environ.get('TEMP', ''), 'netflow_clustering') os.makedirs(temp_dir, exist_ok=True) temp_path = os.path.join(temp_dir, output_filename) result_df.to_csv(temp_path, index=False, encoding='utf_8_sig') print(f"聚类结果已保存为 {temp_path}") # 簇统计信息(使用聚类标签列) if '聚类标签' in result_df.columns: cluster_stats = result_df.groupby('聚类标签').agg({ '包数量': 'mean', '总字节数': 'mean' }).rename(columns={ '包数量': '平均包数量', '总字节数': '平均总字节数' }) print("\n各用户簇流量特征统计:") print(cluster_stats.round(2)) else: print("警告:结果中缺少聚类标签列,无法进行簇统计") # 使用PCA降维可视化 pca = PCA(n_components=2) # 降维到2D principal_components = pca.fit_transform(scaled_features) plt.figure(figsize=(12, 8)) scatter = plt.scatter( principal_components[:, 0], principal_components[:, 1], c=cluster_labels, # 按簇着色 cmap='viridis', alpha=0.6 ) # 标记簇中心 centers = pca.transform(kmeans.cluster_centers_) plt.scatter( centers[:, 0], centers[:, 1], c='red', s=200, alpha=0.9, marker='X', edgecolor='black' ) plt.colorbar(scatter, label='用户簇') plt.xlabel('主成分 1') plt.ylabel('主成分 2') plt.title(f'校园网用户流量聚类 (K={optimal_k})') plt.grid(alpha=0.3) # 可视化文件保存到输出目录 vis_filename = f'cluster_visualization_{timestamp}.png' vis_path = os.path.join(output_dir, vis_filename) plt.savefig(vis_path, dpi=300) plt.close() print(f"聚类可视化图已保存为 {vis_path}") # 最终输出 print("\n聚类完成! 结果文件已保存:") print(f"- {elbow_path}: 肘部法则图") print(f"- {output_path}: 详细聚类结果") print(f"- {vis_path}: 聚类可视化") 根据这段程序画出pcap分包流程

import tkinter as tk from tkinter import ttk, filedialog, messagebox import pandas as pd import numpy as np import matplotlib as mpl import matplotlib.pyplot as plt from matplotlib.backends.backend_tkagg import FigureCanvasTkAgg import tensorflow as tf from tensorflow.keras.models import Model from tensorflow.keras.layers import Input, Dense, Lambda from tensorflow.keras.optimizers import Adam from sklearn.preprocessing import MinMaxScaler import os import time mpl.rcParams['font.sans-serif'] = ['SimHei', 'Microsoft YaHei', 'Arial Unicode MS'] mpl.rcParams['axes.unicode_minus'] = False # 关键修复:使用 ASCII 减号 class PINNModel(tf.keras.Model): def __init__(self, num_layers=4, hidden_units=32, **kwargs): super(PINNModel, self).__init__(**kwargs) self.dense_layers = [Dense(hidden_units, activation='sin') for _ in range(num_layers)] self.final_layer = Dense(1, activation='linear') # 添加带约束的物理参数 self.k_raw = tf.Variable(0.01, trainable=True, dtype=tf.float32, name='k_raw') self.k = tf.math.sigmoid(self.k_raw) * 0.5 # 约束在0-0.5之间 def call(self, inputs): t, h = inputs x = tf.concat([t, h], axis=1) for layer in self.dense_layers: x = layer(x) return self.final_layer(x) def physics_loss(self, t, h_current): """计算物理损失(基于离散渗流方程)""" # 预测下一时刻的水位 h_next_pred = self([t, h_current]) # 离散渗流方程: h_{t+1} = h_t - k * h_t (时间步长=1) residual = h_next_pred - h_current * (1 - self.k) return tf.reduce_mean(tf.square(residual)) class DamSeepageModel: def __init__(self, root): self.root = root self.root.title("大坝渗流预测模型(PINNs)") self.root.geometry("1200x800") # 初始化数据 self.train_df = None #训练集 self.test_df = None #测试集 self.model = None self.scaler = MinMaxScaler(feature_range=(0, 1)) self.evaluation_metrics = {} # 创建主界面 self.create_widgets() def create_widgets(self): # 创建主框架 main_frame = ttk.Frame(self.root, padding=10) main_frame.pack(fill=tk.BOTH, expand=True) # 左侧控制面板 control_frame = ttk.LabelFrame(main_frame, text="模型控制", padding=10) control_frame.pack(side=tk.LEFT, fill=tk.Y, padx=5, pady=5) # 文件选择部分 file_frame = ttk.LabelFrame(control_frame, text="数据文件", padding=10) file_frame.pack(fill=tk.X, pady=5) # 训练集选择 ttk.Label(file_frame, text="训练集:").grid(row=0, column=0, sticky=tk.W, pady=5) self.train_file_var = tk.StringVar() ttk.Entry(file_frame, textvariable=self.train_file_var, width=30, state='readonly').grid(row=0, column=1, padx=5) ttk.Button(file_frame, text="选择文件", command=lambda: self.select_file("train")).grid(row=0, column=2) # 测试集选择 ttk.Label(file_frame, text="测试集:").grid(row=1, column=0, sticky=tk.W, pady=5) self.test_file_var = tk.StringVar() ttk.Entry(file_frame, textvariable=self.test_file_var, width=30, state='readonly').grid(row=1, column=1, padx=5) ttk.Button(file_frame, text="选择文件", command=lambda: self.select_file("test")).grid(row=1, column=2) # PINNs参数设置 param_frame = ttk.LabelFrame(control_frame, text="PINNs参数", padding=10) param_frame.pack(fill=tk.X, pady=10) # 验证集切分比例 ttk.Label(param_frame, text="验证集比例:").grid(row=0, column=0, sticky=tk.W, pady=5) self.split_ratio_var = tk.DoubleVar(value=0.2) ttk.Spinbox(param_frame, from_=0, to=1, increment=0.05, textvariable=self.split_ratio_var, width=10).grid(row=0, column=1, padx=5) # 隐藏层数量 ttk.Label(param_frame, text="网络层数:").grid(row=1, column=0, sticky=tk.W, pady=5) self.num_layers_var = tk.IntVar(value=4) ttk.Spinbox(param_frame, from_=2, to=8, increment=1, textvariable=self.num_layers_var, width=10).grid(row=1, column=1, padx=5) # 每层神经元数量 ttk.Label(param_frame, text="神经元数/层:").grid(row=2, column=0, sticky=tk.W, pady=5) self.hidden_units_var = tk.IntVar(value=32) ttk.Spinbox(param_frame, from_=16, to=128, increment=4, textvariable=self.hidden_units_var, width=10).grid(row=2, column=1, padx=5) # 训练轮次 ttk.Label(param_frame, text="训练轮次:").grid(row=3, column=0, sticky=tk.W, pady=5) self.epochs_var = tk.IntVar(value=500) ttk.Spinbox(param_frame, from_=100, to=2000, increment=100, textvariable=self.epochs_var, width=10).grid(row=3, column=1, padx=5) # 物理损失权重 ttk.Label(param_frame, text="物理损失权重:").grid(row=4, column=0, sticky=tk.W, pady=5) self.physics_weight_var = tk.DoubleVar(value=0.5) ttk.Spinbox(param_frame, from_=0.1, to=1.0, increment=0.1, textvariable=self.physics_weight_var, width=10).grid(row=4, column=1, padx=5) # 控制按钮 btn_frame = ttk.Frame(control_frame) btn_frame.pack(fill=tk.X, pady=10) ttk.Button(btn_frame, text="训练模型", command=self.train_model).pack(side=tk.LEFT, padx=5) ttk.Button(btn_frame, text="预测结果", command=self.predict).pack(side=tk.LEFT, padx=5) ttk.Button(btn_frame, text="保存结果", command=self.save_results).pack(side=tk.LEFT, padx=5) ttk.Button(btn_frame, text="重置", command=self.reset).pack(side=tk.RIGHT, padx=5) # 状态栏 self.status_var = tk.StringVar(value="就绪") status_bar = ttk.Label(control_frame, textvariable=self.status_var, relief=tk.SUNKEN, anchor=tk.W) status_bar.pack(fill=tk.X, side=tk.BOTTOM) # 右侧结果显示区域 result_frame = ttk.Frame(main_frame) result_frame.pack(side=tk.RIGHT, fill=tk.BOTH, expand=True, padx=5, pady=5) # 创建标签页 self.notebook = ttk.Notebook(result_frame) self.notebook.pack(fill=tk.BOTH, expand=True) # 损失曲线标签页 self.loss_frame = ttk.Frame(self.notebook) self.notebook.add(self.loss_frame, text="训练损失") # 预测结果标签页 self.prediction_frame = ttk.Frame(self.notebook) self.notebook.add(self.prediction_frame, text="预测结果") # 指标显示 self.metrics_var = tk.StringVar() metrics_label = ttk.Label( self.prediction_frame, textvariable=self.metrics_var, font=('TkDefaultFont', 10, 'bold'), relief='ridge', padding=5 ) metrics_label.pack(fill=tk.X, padx=5, pady=5) # 初始化绘图区域 self.fig, self.ax = plt.subplots(figsize=(10, 6)) self.canvas = FigureCanvasTkAgg(self.fig, master=self.prediction_frame) self.canvas.get_tk_widget().pack(fill=tk.BOTH, expand=True) # 损失曲线画布 self.loss_fig, self.loss_ax = plt.subplots(figsize=(10, 4)) self.loss_canvas = FigureCanvasTkAgg(self.loss_fig, master=self.loss_frame) self.loss_canvas.get_tk_widget().pack(fill=tk.BOTH, expand=True) def select_file(self, file_type): """选择Excel文件""" file_path = filedialog.askopenfilename( title=f"选择{file_type}集Excel文件", filetypes=[("Excel文件", "*.xlsx *.xls"), ("所有文件", "*.*")] ) if file_path: try: df = pd.read_excel(file_path) # 时间特征处理 time_features = ['year', 'month', 'day'] missing_time_features = [feat for feat in time_features if feat not in df.columns] if missing_time_features: messagebox.showerror("列名错误", f"Excel文件缺少预处理后的时间特征列: {', '.join(missing_time_features)}") return # 创建时间戳列 (增强兼容性) time_cols = ['year', 'month', 'day'] if 'hour' in df.columns: time_cols.append('hour') if 'minute' in df.columns: time_cols.append('minute') if 'second' in df.columns: time_cols.append('second') # 填充缺失的时间单位 for col in ['hour', 'minute', 'second']: if col not in df.columns: df[col] = 0 df['datetime'] = pd.to_datetime(df[time_cols]) # 设置时间索引 df = df.set_index('datetime') # 计算相对时间(天) df['days'] = (df.index - df.index[0]).days # 保存数据 if file_type == "train": self.train_df = df self.train_file_var.set(os.path.basename(file_path)) self.status_var.set(f"已加载训练集: {len(self.train_df)}条数据") else: self.test_df = df self.test_file_var.set(os.path.basename(file_path)) self.status_var.set(f"已加载测试集: {len(self.test_df)}条数据") except Exception as e: messagebox.showerror("文件错误", f"读取文件失败: {str(e)}") def calculate_metrics(self, y_true, y_pred): """计算评估指标""" from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score mse = mean_squared_error(y_true, y_pred) rmse = np.sqrt(mse) mae = mean_absolute_error(y_true, y_pred) non_zero_idx = np.where(y_true != 0)[0] if len(non_zero_idx) > 0: mape = np.mean(np.abs((y_true[non_zero_idx] - y_pred[non_zero_idx]) / y_true[non_zero_idx])) * 100 else: mape = float('nan') r2 = r2_score(y_true, y_pred) return { 'MSE': mse, 'RMSE': rmse, 'MAE': mae, 'MAPE': mape, 'R2': r2 } def train_model(self): """训练PINNs模型(带早停机制+训练指标监控,无指标绘图)""" if self.train_df is None: messagebox.showwarning("警告", "请先选择训练集文件") return try: self.status_var.set("正在预处理数据...") self.root.update() # 从训练集中切分训练子集和验证子集(时间顺序切分) split_ratio = 1 - self.split_ratio_var.get() split_idx = int(len(self.train_df) * split_ratio) train_subset = self.train_df.iloc[:split_idx] valid_subset = self.train_df.iloc[split_idx:] # 检查数据量是否足够 if len(train_subset) < 2 or len(valid_subset) < 2: messagebox.showerror("数据错误", "训练集数据量不足(至少需要2个时间步)") return # 数据预处理(训练子集拟合scaler,验证子集用相同scaler) train_subset_scaled = self.scaler.fit_transform(train_subset[['水位']]) valid_subset_scaled = self.scaler.transform(valid_subset[['水位']]) # 准备训练数据(原始值用于指标计算) t_train = train_subset['days'].values[1:].reshape(-1, 1).astype(np.float32) h_train = train_subset_scaled[:-1].astype(np.float32) h_next_train_scaled = train_subset_scaled[1:].astype(np.float32) # 归一化后的标签 h_next_train_true = train_subset['水位'].values[1:].reshape(-1, 1) # 原始真实值(反归一化前) # 准备验证数据(原始值用于指标计算) t_valid = valid_subset['days'].values[1:].reshape(-1, 1).astype(np.float32) h_valid = valid_subset_scaled[:-1].astype(np.float32) h_next_valid_scaled = valid_subset_scaled[1:].astype(np.float32) # 归一化后的标签 h_next_valid_true = valid_subset['水位'].values[1:].reshape(-1, 1) # 原始真实值 # 创建模型和优化器 self.model = PINNModel( num_layers=self.num_layers_var.get(), hidden_units=self.hidden_units_var.get() ) optimizer = Adam(learning_rate=0.001) # 构建训练/验证数据集 train_dataset = tf.data.Dataset.from_tensor_slices(((t_train, h_train), h_next_train_scaled)) train_dataset = train_dataset.shuffle(buffer_size=1024).batch(32) valid_dataset = tf.data.Dataset.from_tensor_slices(((t_valid, h_valid), h_next_valid_scaled)) valid_dataset = valid_dataset.batch(32) # 验证集无需shuffle # 损失记录(新增指标记录) train_data_loss_history = [] physics_loss_history = [] valid_data_loss_history = [] # 新增:训练集和验证集的指标历史(MSE, RMSE等) train_metrics_history = [] # 每个元素是字典(如{'MSE':..., 'RMSE':...}) valid_metrics_history = [] # 早停机制参数 patience = int(self.epochs_var.get() / 3) min_delta = 1e-4 best_valid_loss = float('inf') wait = 0 best_epoch = 0 best_weights = None start_time = time.time() # 自定义训练循环(新增指标计算) for epoch in range(self.epochs_var.get()): # 训练阶段 epoch_train_data_loss = [] epoch_physics_loss = [] # 收集训练预测值(归一化后) train_pred_scaled = [] for step, ((t_batch, h_batch), h_next_batch) in enumerate(train_dataset): with tf.GradientTape() as tape: h_pred = self.model([t_batch, h_batch]) data_loss = tf.reduce_mean(tf.square(h_next_batch - h_pred)) physics_loss = self.model.physics_loss(t_batch, h_batch) loss = data_loss + self.physics_weight_var.get() * physics_loss grads = tape.gradient(loss, self.model.trainable_variables) optimizer.apply_gradients(zip(grads, self.model.trainable_variables)) epoch_train_data_loss.append(data_loss.numpy()) epoch_physics_loss.append(physics_loss.numpy()) train_pred_scaled.append(h_pred.numpy()) # 保存训练预测值(归一化) # 合并训练预测值(归一化后) train_pred_scaled = np.concatenate(train_pred_scaled, axis=0) # 反归一化得到原始预测值 train_pred_true = self.scaler.inverse_transform(train_pred_scaled) # 计算训练集指标(使用原始真实值和预测值) train_metrics = self.calculate_metrics( y_true=h_next_train_true.flatten(), y_pred=train_pred_true.flatten() ) train_metrics_history.append(train_metrics) # 验证阶段 epoch_valid_data_loss = [] valid_pred_scaled = [] for ((t_v_batch, h_v_batch), h_v_next_batch) in valid_dataset: h_v_pred = self.model([t_v_batch, h_v_batch]) valid_data_loss = tf.reduce_mean(tf.square(h_v_next_batch - h_v_pred)) epoch_valid_data_loss.append(valid_data_loss.numpy()) valid_pred_scaled.append(h_v_pred.numpy()) # 保存验证预测值(归一化) # 合并验证预测值(归一化后) valid_pred_scaled = np.concatenate(valid_pred_scaled, axis=0) # 反归一化得到原始预测值 valid_pred_true = self.scaler.inverse_transform(valid_pred_scaled) # 计算验证集指标(使用原始真实值和预测值) valid_metrics = self.calculate_metrics( y_true=h_next_valid_true.flatten(), y_pred=valid_pred_true.flatten() ) valid_metrics_history.append(valid_metrics) # 计算平均损失 avg_train_data_loss = np.mean(epoch_train_data_loss) avg_physics_loss = np.mean(epoch_physics_loss) avg_valid_data_loss = np.mean(epoch_valid_data_loss) # 记录损失 train_data_loss_history.append(avg_train_data_loss) physics_loss_history.append(avg_physics_loss) valid_data_loss_history.append(avg_valid_data_loss) # 早停机制逻辑(与原代码一致) current_valid_loss = avg_valid_data_loss if current_valid_loss < best_valid_loss - min_delta: best_valid_loss = current_valid_loss best_epoch = epoch + 1 wait = 0 best_weights = self.model.get_weights() else: wait += 1 if wait >= patience: self.status_var.set(f"触发早停!最佳轮次: {best_epoch},最佳验证损失: {best_valid_loss:.4f}") if best_weights is not None: self.model.set_weights(best_weights) break # 更新状态(新增指标显示) if epoch % 10 == 0: # 提取当前训练/验证的关键指标(如RMSE) train_rmse = train_metrics['RMSE'] valid_rmse = valid_metrics['RMSE'] train_r2 = train_metrics['R2'] valid_r2 = valid_metrics['R2'] k_value = self.model.k.numpy() elapsed = time.time() - start_time self.status_var.set( f"训练中 | 轮次: {epoch + 1}/{self.epochs_var.get()} | " f"训练RMSE: {train_rmse:.4f} | 验证RMSE: {valid_rmse:.4f} | " f"训练R²: {train_r2:.4f} | 验证R²: {valid_r2:.4f} | " f"k: {k_value:.6f} | 时间: {elapsed:.1f}秒 | 早停等待: {wait}/{patience}" ) self.root.update() # 绘制损失曲线(仅保留原始损失曲线) self.loss_ax.clear() epochs_range = range(1, len(train_data_loss_history) + 1) self.loss_ax.plot(epochs_range, train_data_loss_history, 'b-', label='训练数据损失') self.loss_ax.plot(epochs_range, physics_loss_history, 'r--', label='物理损失') self.loss_ax.plot(epochs_range, valid_data_loss_history, 'g-.', label='验证数据损失') self.loss_ax.set_title('PINNs训练与验证损失') self.loss_ax.set_xlabel('轮次') self.loss_ax.set_ylabel('损失', rotation=0) self.loss_ax.legend() self.loss_ax.grid(True, alpha=0.3) self.loss_ax.set_yscale('log') self.loss_canvas.draw() # 训练完成提示(保留指标总结) elapsed = time.time() - start_time if wait >= patience: completion_msg = ( f"早停触发 | 最佳轮次: {best_epoch} | 最佳验证损失: {best_valid_loss:.4f} | " f"最佳验证RMSE: {valid_metrics_history[best_epoch - 1]['RMSE']:.4f} | " f"总时间: {elapsed:.1f}秒" ) else: completion_msg = ( f"训练完成 | 总轮次: {self.epochs_var.get()} | " f"最终训练RMSE: {train_metrics_history[-1]['RMSE']:.4f} | " f"最终验证RMSE: {valid_metrics_history[-1]['RMSE']:.4f} | " f"最终训练R²: {train_metrics_history[-1]['R2']:.4f} | " f"最终验证R²: {valid_metrics_history[-1]['R2']:.4f} | " f"总时间: {elapsed:.1f}秒" ) # 在训练循环结束后,保存训练历史 self.train_history = { 'train_data_loss': train_data_loss_history, 'physics_loss': physics_loss_history, 'valid_data_loss': valid_data_loss_history, 'train_metrics': train_metrics_history, 'valid_metrics': valid_metrics_history } self.status_var.set(completion_msg) messagebox.showinfo("训练完成", f"PINNs模型训练成功完成!\n{completion_msg}") except Exception as e: messagebox.showerror("训练错误", f"模型训练失败:\n{str(e)}") self.status_var.set("训练失败") def predict(self): """使用PINNs模型进行预测(优化时间轴刻度与网格线)""" if self.model is None: messagebox.showwarning("警告", "请先训练模型") return if self.test_df is None: messagebox.showwarning("警告", "请先选择测试集文件") return try: self.status_var.set("正在生成预测...") self.root.update() # 预处理测试数据 test_scaled = self.scaler.transform(self.test_df[['水位']]) # 准备时间特征 t_test = self.test_df['days'].values.reshape(-1, 1).astype(np.float32) # 递归预测 predictions = [] for i in range(len(t_test)): h_current = np.array([[test_scaled[i][0]]]).astype(np.float32) h_pred = self.model([t_test[i:i + 1], h_current]) predictions.append(h_pred.numpy()[0][0]) # 反归一化 predictions = np.array(predictions).reshape(-1, 1) predictions = self.scaler.inverse_transform(predictions) actual_values = self.scaler.inverse_transform(test_scaled) # 创建时间索引(确保为DatetimeIndex) test_time = self.test_df.index # 假设为pandas DatetimeIndex类型 # 清除现有图表 self.ax.clear() # 绘制结果 self.ax.plot(test_time, actual_values, 'b-', label='真实值') self.ax.plot(test_time, predictions, 'r--', label='预测值') self.ax.set_title('大坝渗流水位预测结果(PINNs)') self.ax.set_xlabel('时间') self.ax.set_ylabel('测压管水位', rotation=0) self.ax.legend() # 添加网格和样式(优化时间轴) import matplotlib.dates as mdates # 导入日期刻度工具 # 设置x轴刻度:主刻度(年份)和次要刻度(每2个月) # 主刻度:每年1月1日(或数据起始年的第一个时间点) self.ax.xaxis.set_major_locator(mdates.YearLocator()) self.ax.xaxis.set_major_formatter(mdates.DateFormatter('%Y')) # 仅显示年份 # 次要刻度:每2个月(如2月、4月、6月...) self.ax.xaxis.set_minor_locator(mdates.MonthLocator(interval=2)) # 添加次要网格线(每2个月的竖直虚线) self.ax.grid(which='minor', axis='x', linestyle='--', color='gray', alpha=0.3) # 主要网格线(可选,保持原有水平网格) self.ax.grid(which='major', axis='y', linestyle='-', color='lightgray', alpha=0.5) # 优化刻度标签显示(避免重叠) self.ax.tick_params(axis='x', which='major', rotation=0, labelsize=10) self.ax.tick_params(axis='x', which='minor', length=3) # 次要刻度线长度 # 计算并显示评估指标(保持原有逻辑) self.evaluation_metrics = self.calculate_metrics( actual_values.flatten(), predictions.flatten() ) metrics_text = ( f"MSE: {self.evaluation_metrics['MSE']:.4f} | " f"RMSE: {self.evaluation_metrics['RMSE']:.4f} | " f"MAE: {self.evaluation_metrics['MAE']:.4f} | " f"MAPE: {self.evaluation_metrics['MAPE']:.2f}% | " f"R²: {self.evaluation_metrics['R2']:.4f}" ) # 更新文本标签 self.metrics_var.set(metrics_text) # 在图表上添加指标(位置调整,避免覆盖时间刻度) self.ax.text( 0.5, 1.08, metrics_text, # 略微上移避免与网格重叠 transform=self.ax.transAxes, ha='center', fontsize=10, bbox=dict(facecolor='white', alpha=0.8) ) # 调整布局(重点优化时间轴边距) plt.tight_layout(pad=2.0) # 增加底部边距避免刻度标签被截断 self.canvas.draw() # 保存预测结果(保持原有逻辑) self.predictions = predictions self.actual_values = actual_values self.test_time = test_time self.status_var.set("预测完成,结果已显示") except Exception as e: messagebox.showerror("预测错误", f"预测失败:\n{str(e)}") self.status_var.set("预测失败") def save_results(self): """保存预测结果和训练历史数据""" if not hasattr(self, 'predictions') or not hasattr(self, 'train_history'): messagebox.showwarning("警告", "请先生成预测结果并完成训练") return # 选择保存路径 save_path = filedialog.asksaveasfilename( defaultextension=".xlsx", filetypes=[("Excel文件", "*.xlsx"), ("所有文件", "*.*")], title="保存结果" ) if not save_path: return try: # 1. 创建预测结果DataFrame result_df = pd.DataFrame({ '时间': self.test_time, '实际水位': self.actual_values.flatten(), '预测水位': self.predictions.flatten() }) # 2. 创建评估指标DataFrame metrics_df = pd.DataFrame([self.evaluation_metrics]) # 3. 创建训练历史DataFrame history_data = { '轮次': list(range(1, len(self.train_history['train_data_loss']) + 1)), '训练数据损失': self.train_history['train_data_loss'], '物理损失': self.train_history['physics_loss'], '验证数据损失': self.train_history['valid_data_loss'] } # 添加训练集指标 for metric in ['MSE', 'RMSE', 'MAE', 'MAPE', 'R2']: history_data[f'训练集_{metric}'] = [item[metric] for item in self.train_history['train_metrics']] # 添加验证集指标 for metric in ['MSE', 'RMSE', 'MAE', 'MAPE', 'R2']: history_data[f'验证集_{metric}'] = [item[metric] for item in self.train_history['valid_metrics']] history_df = pd.DataFrame(history_data) # 保存到Excel with pd.ExcelWriter(save_path) as writer: result_df.to_excel(writer, sheet_name='预测结果', index=False) metrics_df.to_excel(writer, sheet_name='评估指标', index=False) history_df.to_excel(writer, sheet_name='训练历史', index=False) # 保存图表 chart_path = os.path.splitext(save_path)[0] + "_chart.png" self.fig.savefig(chart_path, dpi=300) # 保存损失曲线图 loss_path = os.path.splitext(save_path)[0] + "_loss.png" self.loss_fig.savefig(loss_path, dpi=300) self.status_var.set(f"结果已保存至: {os.path.basename(save_path)}") messagebox.showinfo("保存成功", f"预测结果和图表已保存至:\n" f"主文件: {save_path}\n" f"预测图表: {chart_path}\n" f"损失曲线: {loss_path}") except Exception as e: messagebox.showerror("保存错误", f"保存结果失败:\n{str(e)}") def reset(self): """重置程序状态""" self.train_df = None self.test_df = None self.model = None self.train_file_var.set("") self.test_file_var.set("") # 清除训练历史 if hasattr(self, 'train_history'): del self.train_history # 清除图表 if hasattr(self, 'ax'): self.ax.clear() if hasattr(self, 'loss_ax'): self.loss_ax.clear() # 重绘画布 if hasattr(self, 'canvas'): self.canvas.draw() if hasattr(self, 'loss_canvas'): self.loss_canvas.draw() # 清除状态 self.status_var.set("已重置,请选择新数据") # 清除预测结果 if hasattr(self, 'predictions'): del self.predictions # 清除指标文本 if hasattr(self, 'metrics_var'): self.metrics_var.set("") messagebox.showinfo("重置", "程序已重置,可以开始新的分析") if __name__ == "__main__": root = tk.Tk() app = DamSeepageModel(root) root.mainloop() 我这个代码是对大坝水位进行模型训练并且预测,现在帮我把PINNModel类中隐藏层的激活函数改成swish,并且引入正则化防止过拟合

大家在看

recommend-type

Delphi编写的SQL查询分析器.rar

因为需要在客户那里维护一些数据, 但是人家的电脑不见得都安装了SQL Server客户端, 每次带光盘去给人家装程序也不好意思. 于是就写这个SQL查询分析器。代码不够艺术, 结构也松散, 如果代码看不懂, 只好见谅了. 程序中用到的图标, 动画都是从微软的SQLServer搞过来的, 唯一值得一提的是, 我用了ADO Binding for VC Extension(MSDN上有详细资料), 速度比用Variant快(在ADOBinding.pas和RowData.pas)。
recommend-type

kb4474419和kb4490628系统补丁.rar

要安装一些软件需要这两个补丁包,比如在win7上安装NOD32。
recommend-type

ceph心跳丢失问题分析

最近测试了ceph集群承载vm上限的实验,以及在极端压力下的表现,发现在极端大压力下,ceph集群出现osd心跳丢失,osd mark成down, pg从而运行在degrade的状态。分析了根本原因,总结成ppt分享。
recommend-type

web仿淘宝项目

大一时团队做的一个仿淘宝的web项目,没有实现后台功能
recommend-type

FPGA驱动代码详解:AD7606 SPI与并行模式读取双模式Verilog实现,注释详尽版,FPGA驱动代码详解:AD7606 SPI与并行模式读取双模式Verilog实现,注释详尽版,FPGA V

FPGA驱动代码详解:AD7606 SPI与并行模式读取双模式Verilog实现,注释详尽版,FPGA驱动代码详解:AD7606 SPI与并行模式读取双模式Verilog实现,注释详尽版,FPGA Verilog AD7606驱动代码,包含SPI模式读取和并行模式读取两种,代码注释详细。 ,FPGA; Verilog; AD7606驱动代码; SPI模式读取; 并行模式读取; 代码注释详细。,FPGA驱动代码:AD7606双模式读取(SPI+并行)Verilog代码详解

最新推荐

recommend-type

springboot尿毒症健康管理系统的设计与实现论文

springboot尿毒症健康管理系统的设计与实现
recommend-type

python 列表文本转表格

python
recommend-type

关于多视图几何的论文,以及与点云深度学习相关的文章

资源下载链接为: https://pan.quark.cn/s/a447291460bd 关于多视图几何的论文,以及与点云深度学习相关的文章(最新、最全版本!打开链接下载即可用!)
recommend-type

基于MATLAB Simulink的质子交换膜燃料电池系统模型开发与控制策略仿真

内容概要:本文详细探讨了质子交换膜燃料电池(PEMFC)系统模型的构建与开发,特别是利用MATLAB Simulink平台进行建模和仿真的方法。文中涵盖了空压机、供气系统(阴极和阳极)、背压阀和电堆四个核心组件的数学模型建立,旨在帮助研究人员更好地理解PEMFC系统的运行机制。此外,还介绍了如何通过调整各组件的关键参数来优化系统的性能,从而为实际应用提供了理论依据和技术支持。 适合人群:从事燃料电池研究的技术人员、高校相关专业师生、新能源领域的科研工作者。 使用场景及目标:适用于希望深入了解PEMFC系统内部运作机制的研究人员,以及需要进行控制系统设计和优化的工程技术人员。目标是掌握MATLAB Simulink建模技巧,提升对PEMFC系统的理解和应用能力。 其他说明:文章不仅提供了详细的模型构建步骤,还包括具体的示例代码和讨论,便于读者实践操作并加深理解。
recommend-type

三菱FX3U与东元TECO N310变频器Modbus通讯实战及触摸屏程序详解

三菱FX3U PLC通过485ADP-MB板与东元TECO N310变频器进行Modbus通讯的具体实现方法。主要内容涵盖硬件连接、参数设置、PLC编程技巧以及触摸屏程序的设计。文中提供了详细的接线图、参数配置步骤、核心代码片段,并分享了一些实用的经验和避坑指南。此外,还特别强调了终端电阻的作用、波特率一致性、接地线的重要性以及CRC校验的正确使用。 适合人群:从事工业自动化领域的工程师和技术人员,尤其是需要进行PLC与变频器通讯项目的从业者。 使用场景及目标:适用于需要将三菱FX3U PLC与东元TECO N310变频器进行Modbus通讯的实际工程项目。主要目标是帮助工程师快速掌握通讯配置和编程技巧,确保系统稳定可靠地运行。 其他说明:本文不仅提供理论指导,还包括大量实践经验,如硬件接线注意事项、常见错误排查等,有助于提高工作效率并减少调试时间。同时,附带的触摸屏程序也为不同品牌设备间的兼容性提供了保障。
recommend-type

Mockingbird v2:PocketMine-MP新防作弊机制详解

标题和描述中所涉及的知识点如下: 1. Mockingbird反作弊系统: Mockingbird是一个正在开发中的反作弊系统,专门针对PocketMine-MP服务器。PocketMine-MP是Minecraft Pocket Edition(Minecraft PE)的一个服务器软件,允许玩家在移动平台上共同游戏。随着游戏的普及,作弊问题也随之而来,因此Mockingbird的出现正是为了应对这种情况。 2. Mockingbird的版本迭代: 从描述中提到的“Mockingbird的v1变体”和“v2版本”的变化来看,Mockingbird正在经历持续的开发和改进过程。软件版本迭代是常见的开发实践,有助于修复已知问题,改善性能和用户体验,添加新功能等。 3. 服务器性能要求: 描述中强调了运行Mockingbird的服务器需要具备一定的性能,例如提及“WitherHosting的$ 1.25计划”,这暗示了反作弊系统对服务器资源的需求较高。这可能是因为反作弊机制需要频繁处理大量的数据和事件,以便及时检测和阻止作弊行为。 4. Waterdog问题: Waterdog是另一种Minecraft服务器软件,特别适合 PocketMine-MP。描述中提到如果将Mockingbird和Waterdog结合使用可能会遇到问题,这可能是因为两者在某些机制上的不兼容或Mockingbird对Waterdog的特定实现尚未完全优化。 5. GitHub使用及问题反馈: 作者鼓励用户通过GitHub问题跟踪系统来报告问题、旁路和功能建议。这是一个公共代码托管平台,广泛用于开源项目协作,便于开发者和用户进行沟通和问题管理。作者还提到请用户在GitHub上发布问题而不是在评论区留下不好的评论,这体现了良好的社区维护和用户交流的实践。 6. 软件标签: “pocketmine”和“anticheat”(反作弊)作为标签,说明Mockingbird是一个特别为PocketMine-MP平台开发的反作弊软件。而“PHP”则可能指的是Mockingbird的开发语言,虽然这个信息与常见的Java或C++等开发Minecraft相关软件的语言不同,但并不排除使用PHP进行服务器端开发的可能性,尤其是对于处理动态网页、服务器端脚本等场景。 7. 压缩包文件: “Mockingbird-stable”是一个文件名称,很可能表示这是一个包含最新稳定版Mockingbird反作弊系统的压缩包。通常,这样的文件名中包含“stable”意味着这是一个经过充分测试且推荐用于生产环境的版本。 8. 社区协作和用户参与: 特别感谢部分提到了shur,这可能是对某位贡献者或社区成员的感激之情。这种感谢表明了软件开发不仅是开发者个人的劳动成果,同时也依赖于社区的支持和参与,包括提供反馈、报告问题、贡献代码和文档等。 总结以上内容,我们可以看到Mockingbird作为一款反作弊系统,其开发和维护需要依赖于社区的广泛参与和支持,同时还需要强大的服务器后端作为支撑。通过不断迭代更新版本,开发者希望解决现有问题,提高反作弊效率,并希望与社区保持良好的沟通,以持续优化产品。
recommend-type

“历史人物独白解说”视频:数据处理的6种革命性技术

# 1. 数据处理的历史回顾与技术演进 数据处理的历史,从最初的简单机械记录,到如今复杂的数据处理体系,不仅反映了技术的演进,也映射了人类社会进步的轨迹。在计算机技术发展初期,数据处理主要依靠打孔卡片和磁带,通过批处理的方式进行,效率低下且出错率高。随着技术的发展,上世纪70年代,关系型数据库的出现大大提高了数据处理的效率和准确性,为数据处理技术的发展奠定了基础。 随后,随着互
recommend-type

怎么判断多级运放电路的稳定性?

<think>首先,用户的问题是关于判断多级运算放大器电路的稳定性。我需要根据系统级指令来构建回答。 系统级指令: - 所有行内数学表达式必须使用$...$格式。 - 独立公式必须使用$$...$$格式并单独成段。 - LaTeX语法正确。 - 使用中文回答。 - 生成相关问题。 - 在回答中引用的段落末尾自然地添加引用标识,例如[^1]。 用户可见层指令: - 回答结构清晰,帮助用户逐步解决问题。 - 尽量保证回答真实可靠。 参考引用: - 引用[1]:关于集成运算放大电路的设计、组成和性能评估。 - 引用[2]:高频电路中运放的带宽限制,一级放大电路的增益通常为100倍,过高会引起振
recommend-type

利用AHP和节点集中度解决影响力最大化问题的Flask应用教程

从给定的文件信息中,我们可以提取以下相关知识点进行详细说明: ### 标题知识点 **IM问题与AHP结合** IM问题(Influence Maximization)是网络分析中的一个核心问题,旨在识别影响网络中信息传播的关键节点。为了求解IM问题,研究者们常常结合使用不同的算法和策略,其中AHP(Analytic Hierarchy Process,分析层次结构过程)作为一种决策分析方法,被用于评估网络节点的重要性。AHP通过建立层次模型,对各个因素进行比较排序,从而量化影响度,并通过一致性检验保证决策结果的有效性。将AHP应用于IM问题,意味着将分析网络节点影响的多个维度,比如节点的中心性(centrality)和影响力。 **集中度措施** 集中度(Centralization)是衡量网络节点分布状况的指标,它反映了网络中节点之间的连接关系。在网络分析中,集中度常用于识别网络中的“枢纽”或“中心”节点。例如,通过计算网络的度中心度(degree centrality)可以了解节点与其他节点的直接连接数量;接近中心度(closeness centrality)衡量节点到网络中其他所有节点的平均距离;中介中心度(betweenness centrality)衡量节点在连接网络中其他节点对的最短路径上的出现频率。集中度高意味着节点在网络中处于重要位置,对信息的流动和控制具有较大影响力。 ### 描述知识点 **Flask框架** Flask是一个轻量级的Web应用框架,它使用Python编程语言开发。它非常适合快速开发小型Web应用,以及作为微服务架构的一部分。Flask的一个核心特点是“微”,意味着它提供了基本的Web开发功能,同时保持了框架的小巧和灵活。Flask内置了开发服务器,支持Werkzeug WSGI工具包和Jinja2模板引擎,提供了RESTful请求分发和请求钩子等功能。 **应用布局** 一个典型的Flask应用会包含以下几个关键部分: - `app/`:这是应用的核心目录,包含了路由设置、视图函数、模型和控制器等代码文件。 - `static/`:存放静态文件,比如CSS样式表、JavaScript文件和图片等,这些文件的内容不会改变。 - `templates/`:存放HTML模板文件,Flask将使用这些模板渲染最终的HTML页面。模板语言通常是Jinja2。 - `wsgi.py`:WSGI(Web Server Gateway Interface)是Python应用程序和Web服务器之间的一种标准接口。这个文件通常用于部署到生产服务器时,作为应用的入口点。 **部署到Heroku** Heroku是一个支持多种编程语言的云平台即服务(PaaS),它允许开发者轻松部署、运行和管理应用。部署Flask应用到Heroku,需要几个步骤:首先,创建一个Procfile文件,告知Heroku如何启动应用;其次,确保应用的依赖关系被正确管理,通常通过一个requirements.txt文件列出所有依赖;最后,使用Git将应用推送到Heroku提供的仓库,Heroku会自动识别Procfile并开始部署过程。 ### 标签知识点 **HTML** HTML(HyperText Markup Language,超文本标记语言)是用于创建网页和Web应用的标准标记语言。它定义了网页的结构和内容。HTML文件由一系列元素组成,这些元素通过标签(tags)来表示,如`<p>`代表段落,`<a>`代表链接,`<img>`代表图像等。HTML5是当前使用的最新版本,支持更多的特性,如离线存储、多媒体和图形等。 ### 压缩包子文件的文件名称列表知识点 **IMproblem-using-AHP-and-centralisation-of-nodes-master** 这里的文件名称“IMproblem-using-AHP-and-centralisation-of-nodes-master”表明了一个GitHub仓库的名称,其中包含了源代码以及与项目相关的所有文件。从名称中可以看出,该仓库是关于如何结合AHP和节点集中度分析来解决IM问题的Flask应用程序。文件名中的“master”表明这是仓库的主分支(现在叫做main分支),它是项目最新的、可部署的代码版本。 综合来看,本段信息为我们提供了构建和部署一个使用Flask框架、针对IM问题使用AHP和节点集中度分析的Web应用的方法和步骤。同时,介绍了应用在不同环节中所需技术和组件的详细知识点。
recommend-type

视频内容自动生成算法:突破性的8大最新进展

# 1. 视频内容自动生成算法概述 ## 算法发展背景 随着人工智能技术的迅速发展,视频内容自动生成算法已经成为媒体和娱乐行业的重要工具。这些算法能够自动编辑和合成视频内容,使内容创作者能够以较低的成本和时间生成高质量的视频。从社交媒体动态到在线教育内容,视频内容自动生成的应用场景正在不断扩大。 ## 核心技术简述 视