import os import pandas as pd import tkinter as tk from tkinter import ttk, filedialog, scrolledtext, messagebox from tkinter.colorchooser import askcolor from difflib import SequenceMatcher import re import openpyxl import threading import numpy as np from openpyxl.utils import get_column_letter import xlrd import gc import hashlib import json import tempfile from concurrent.futures import ThreadPoolExecutor, as_completed import unicodedata from datetime import datetime class EnhancedSignalComparator: def __init__(self, root): self.root = root self.root.title("增强版信号功能对比工具") self.root.geometry("1200x800") self.root.configure(bg="#f0f0f0") # 初始化变量 self.folder_path = tk.StringVar() self.search_text = tk.StringVar() self.files = [] self.results = {} # 存储信号对比结果 self.highlight_color = "#FFD700" # 默认高亮色 self.search_running = False self.stop_requested = False self.cache_dir = os.path.join(tempfile.gettempdir(), "excel_cache") self.file_cache = {} # 文件缓存 self.column_cache = {} # 列名缓存 self.max_workers = 4 # 最大并发线程数 # 创建缓存目录 os.makedirs(self.cache_dir, exist_ok=True) # 创建界面 self.create_widgets() self.log_file = "comparator.log" self.setup_logging() def setup_logging(self): """初始化日志系统""" with open(self.log_file, "w", encoding="utf-8") as log_file: log_file.write(f"{datetime.now().isoformat()} - 日志初始化\n") def log(self, message): """记录日志""" timestamp = datetime.now().isoformat() log_entry = f"{timestamp} - {message}\n" # 控制台输出 print(log_entry.strip()) # 文件记录 with open(self.log_file, "a", encoding="utf-8") as log_file: log_file.write(log_entry) # 状态栏显示(缩短版本) if len(message) > 60: self.status_var.set(message[:57] + "...") else: self.status_var.set(message) def create_widgets(self): # 顶部控制面板 control_frame = ttk.Frame(self.root, padding=10) control_frame.pack(fill=tk.X) # 文件夹选择 ttk.Label(control_frame, text="选择文件夹:").grid(row=0, column=0, sticky=tk.W) folder_entry = ttk.Entry(control_frame, textvariable=self.folder_path, width=50) folder_entry.grid(row=0, column=1, padx=5, sticky=tk.EW) ttk.Button(control_frame, text="浏览...", command=self.browse_folder).grid(row=0, column=2) # 搜索输入 ttk.Label(control_frame, text="搜索信号:").grid(row=1, column=0, sticky=tk.W, pady=(10,0)) search_entry = ttk.Entry(control_frame, textvariable=self.search_text, width=50) search_entry.grid(row=1, column=1, padx=5, pady=(10,0), sticky=tk.EW) search_entry.bind("<Return>", lambda event: self.start_search_thread()) ttk.Button(control_frame, text="搜索", command=self.start_search_thread).grid(row=1, column=2, pady=(10,0)) ttk.Button(control_frame, text="停止", command=self.stop_search).grid(row=1, column=3, pady=(10,0), padx=5) # 高级选项 ttk.Label(control_frame, text="并发线程:").grid(row=2, column=0, sticky=tk.W, pady=(10,0)) self.thread_var = tk.StringVar(value="4") ttk.Combobox(control_frame, textvariable=self.thread_var, values=["1", "2", "4", "8"], width=5).grid(row=2, column=1, sticky=tk.W, padx=5, pady=(10,0)) # 文件过滤 ttk.Label(control_frame, text="文件过滤:").grid(row=2, column=2, sticky=tk.W, pady=(10,0)) self.filter_var = tk.StringVar(value="*.xlsx;*.xlsm;*.xls") ttk.Entry(control_frame, textvariable=self.filter_var, width=20).grid(row=2, column=3, sticky=tk.W, padx=5, pady=(10,0)) # 高亮颜色选择 ttk.Label(control_frame, text="高亮颜色:").grid(row=3, column=0, sticky=tk.W, pady=(10,0)) self.color_btn = tk.Button(control_frame, bg=self.highlight_color, width=3, command=self.choose_color) self.color_btn.grid(row=3, column=1, sticky=tk.W, padx=5, pady=(10,0)) # 进度条 self.progress = ttk.Progressbar(control_frame, orient="horizontal", length=200, mode="determinate") self.progress.grid(row=3, column=2, columnspan=2, sticky=tk.EW, padx=5, pady=(10,0)) # 结果标签 self.result_label = ttk.Label(control_frame, text="") self.result_label.grid(row=3, column=4, sticky=tk.W, padx=5, pady=(10,0)) # 对比面板 notebook = ttk.Notebook(self.root) notebook.pack(fill=tk.BOTH, expand=True, padx=10, pady=10) # 表格视图 self.table_frame = ttk.Frame(notebook) notebook.add(self.table_frame, text="表格视图") # 文本对比视图 self.text_frame = ttk.Frame(notebook) notebook.add(self.text_frame, text="行内容对比") # 状态栏 self.status_var = tk.StringVar() status_bar = ttk.Label(self.root, textvariable=self.status_var, relief=tk.SUNKEN, anchor=tk.W) status_bar.pack(side=tk.BOTTOM, fill=tk.X) # 初始化表格和文本区域 self.init_table_view() self.init_text_view() def init_table_view(self): """初始化表格视图""" # 创建树状表格 columns = ("信号", "文件", "行内容摘要") self.tree = ttk.Treeview(self.table_frame, columns=columns, show="headings") # 设置列标题 for col in columns: self.tree.heading(col, text=col) self.tree.column(col, width=200, anchor=tk.W) # 添加滚动条 scrollbar = ttk.Scrollbar(self.table_frame, orient=tk.VERTICAL, command=self.tree.yview) self.tree.configure(yscrollcommand=scrollbar.set) self.tree.pack(side=tk.LEFT, fill=tk.BOTH, expand=True) scrollbar.pack(side=tk.RIGHT, fill=tk.Y) # 绑定选择事件 self.tree.bind("<<TreeviewSelect>>", self.on_table_select) def init_text_view(self): """初始化文本对比视图""" self.text_panes = {} self.text_frame.columnconfigure(0, weight=1) self.text_frame.rowconfigure(0, weight=1) # 创建对比容器 self.compare_container = ttk.Frame(self.text_frame) self.compare_container.grid(row=0, column=0, sticky="nsew", padx=5, pady=5) # 添加差异高亮按钮 btn_frame = ttk.Frame(self.text_frame) btn_frame.grid(row=1, column=0, sticky="ew", padx=5, pady=5) ttk.Button(btn_frame, text="高亮显示差异", command=self.highlight_differences).pack(side=tk.LEFT) ttk.Button(btn_frame, text="导出差异报告", command=self.export_report).pack(side=tk.LEFT, padx=5) ttk.Button(btn_frame, text="清除缓存", command=self.clear_cache).pack(side=tk.LEFT, padx=5) ttk.Button(btn_frame, text="手动指定列名", command=self.manual_column_select).pack(side=tk.LEFT, padx=5) def browse_folder(self): """选择文件夹""" folder = filedialog.askdirectory(title="选择包含Excel文件的文件夹") if folder: self.folder_path.set(folder) self.load_files() def load_files(self): """加载文件夹中的Excel文件(优化特殊字符处理)""" folder = self.folder_path.get() if not folder or not os.path.isdir(folder): return # 获取文件过滤模式 filter_patterns = self.filter_var.get().split(';') self.files = [] for file in os.listdir(folder): file_path = os.path.join(folder, file) # 跳过临时文件 if file.startswith('~$'): continue # 检查文件扩展名 file_lower = file.lower() matched = False for pattern in filter_patterns: # 移除通配符并转换为小写 ext = pattern.replace('*', '').lower() if file_lower.endswith(ext): matched = True break if matched: # 规范化文件名处理特殊字符 normalized_path = self.normalize_file_path(file_path) if normalized_path and os.path.isfile(normalized_path): self.files.append(normalized_path) self.status_var.set(f"找到 {len(self.files)} 个Excel文件") def normalize_file_path(self, path): """规范化文件路径,处理特殊字符""" try: # 尝试直接访问文件 if os.path.exists(path): return path # 尝试Unicode规范化 normalized = unicodedata.normalize('NFC', path) if os.path.exists(normalized): return normalized # 尝试不同编码方案 encodings = ['utf-8', 'shift_jis', 'euc-jp', 'cp932'] for encoding in encodings: try: decoded = path.encode('latin1').decode(encoding) if os.path.exists(decoded): return decoded except: continue # 最终尝试原始路径 return path except Exception as e: self.status_var.set(f"文件路径处理错误: {str(e)}") return path def get_file_hash(self, file_path): """计算文件哈希值用于缓存""" try: hash_md5 = hashlib.md5() with open(file_path, "rb") as f: for chunk in iter(lambda: f.read(4096), b""): hash_md5.update(chunk) return hash_md5.hexdigest() except Exception as e: self.status_var.set(f"计算文件哈希失败: {str(e)}") return str(os.path.getmtime(file_path)) def get_cache_filename(self, file_path): """获取缓存文件名""" file_hash = self.get_file_hash(file_path) return os.path.join(self.cache_dir, f"{os.path.basename(file_path)}_{file_hash}.cache") def load_header_cache(self, file_path): """加载列名缓存""" cache_file = self.get_cache_filename(file_path) if os.path.exists(cache_file): try: with open(cache_file, "r", encoding='utf-8') as f: return json.load(f) except: return None return None def save_header_cache(self, file_path, header_info): """保存列名缓存""" cache_file = self.get_cache_filename(file_path) try: with open(cache_file, "w", encoding='utf-8') as f: json.dump(header_info, f) return True except: return False def find_header_row(self, file_path): """查找列名行(增强版)""" # 禁用缓存进行测试 # return None, None # 检查缓存 cache = self.load_header_cache(file_path) if cache: return cache.get("header_row"), cache.get("signal_col") # 没有缓存则重新查找 if file_path.lower().endswith((".xlsx", ".xlsm")): return self.find_header_row_openpyxl(file_path) elif file_path.lower().endswith(".xls"): return self.find_header_row_xlrd(file_path) return None, None def find_header_row_openpyxl(self, file_path): """使用openpyxl查找列名行(增强版)""" try: self.log(f"开始处理文件: {os.path.basename(file_path)}") wb = openpyxl.load_workbook(file_path, read_only=True, data_only=True) ws = wb.active # 尝试多种列名匹配模式 patterns = [ r'データ名', r'データ名', r'信号名', r'Signal Name', r'Data Name', r'信号名称', r'データ名称', r'信号' ] # 扩大搜索范围:前100行和前200列 for row_idx in range(1, 101): for col_idx in range(1, 201): try: cell = ws.cell(row=row_idx, column=col_idx) cell_value = cell.value if not cell_value: continue cell_str = str(cell_value) for pattern in patterns: if re.search(pattern, cell_str, re.IGNORECASE): self.log(f"找到匹配模式 '{pattern}' 在行{row_idx}列{col_idx}") # 找到列名行后,尝试确定信号列 signal_col = None # 在同行中查找信号列 for col_idx2 in range(1, 101): # 1-100列 try: cell2 = ws.cell(row=row_idx, column=col_idx2) cell2_value = cell2.value if not cell2_value: continue cell2_str = str(cell2_value) if re.search(pattern, cell2_str, re.IGNORECASE): signal_col = col_idx2 break except: continue # 保存缓存 if signal_col is not None: header_info = {"header_row": row_idx, "signal_col": signal_col} self.save_header_cache(file_path, header_info) wb.close() return row_idx, signal_col except: continue wb.close() except Exception as e: self.log(f"查找列名行出错: {str(e)}") return None, None def find_header_row_xlrd(self, file_path): """使用xlrd查找列名行(增强版)""" try: wb = xlrd.open_workbook(file_path) ws = wb.sheet_by_index(0) # 尝试多种列名匹配模式 patterns = [ r'データ名', # 半角片假名 r'データ名', # 全角片假名 r'信号名', # 中文 r'Signal Name', # 英文 r'Data Name', r'信号名称', r'データ名称' ] # 扩大搜索范围:前50行和前100列 for row_idx in range(0, 50): # 0-49行 # 扩大列搜索范围到100列 for col_idx in range(0, 100): # 0-99列 try: cell_value = ws.cell_value(row_idx, col_idx) if not cell_value: continue # 尝试所有匹配模式 cell_str = str(cell_value) for pattern in patterns: if re.search(pattern, cell_str, re.IGNORECASE): # 找到列名行后,尝试确定信号列 signal_col = None # 在同行中查找信号列 for col_idx2 in range(0, 100): # 0-99列 try: cell2_value = ws.cell_value(row_idx, col_idx2) if not cell2_value: continue cell2_str = str(cell2_value) if re.search(pattern, cell2_str, re.IGNORECASE): signal_col = col_idx2 break except: continue # 保存缓存 if signal_col is not None: header_info = {"header_row": row_idx, "signal_col": signal_col} self.save_header_cache(file_path, header_info) return row_idx, signal_col except: continue except Exception as e: self.status_var.set(f"查找列名行出错: {str(e)}") return None, None def start_search_thread(self): """启动搜索线程""" if self.search_running: return self.search_running = True self.stop_requested = False self.max_workers = int(self.thread_var.get()) threading.Thread(target=self.search_files, daemon=True).start() def stop_search(self): """停止搜索""" self.stop_requested = True self.status_var.set("正在停止搜索...") def search_files(self): """在文件中搜索内容(优化特殊文件处理)""" search_term = self.search_text.get().strip() if not search_term: self.status_var.set("请输入搜索内容") self.search_running = False return if not self.files: self.status_var.set("请先选择文件夹") self.search_running = False return # 重置结果和UI self.results = {} for item in self.tree.get_children(): self.tree.delete(item) total_files = len(self.files) processed_files = 0 found_signals = 0 # 使用线程池处理文件 # 在search_files方法中添加详细进度 with ThreadPoolExecutor(max_workers=self.max_workers) as executor: futures = {} for i, file_path in enumerate(self.files): if self.stop_requested: break future = executor.submit(self.process_file, file_path, search_term) futures[future] = (file_path, i) # 保存文件索引 for future in as_completed(futures): if self.stop_requested: break file_path, idx = futures[future] try: found = future.result() found_signals += found processed_files += 1 # 更详细的进度反馈 progress = int(processed_files / total_files * 100) self.progress["value"] = progress self.status_var.set( f"已处理 {processed_files}/{total_files} 个文件 | " f"当前: {os.path.basename(file_path)} | " f"找到: {found_signals} 个匹配" ) self.root.update_idletasks() except Exception as e: self.status_var.set(f"处理文件 {os.path.basename(file_path)} 出错: {str(e)}") # 更新结果 if self.stop_requested: self.status_var.set(f"搜索已停止,已处理 {processed_files}/{total_files} 个文件") elif found_signals == 0: self.status_var.set(f"未找到包含 '{search_term}' 的信号") else: self.status_var.set(f"找到 {len(self.results)} 个匹配信号,共 {found_signals} 处匹配") self.update_text_view() self.progress["value"] = 0 self.search_running = False gc.collect() # 强制垃圾回收释放内存 def process_file(self, file_path, search_term): """处理单个文件(增强异常处理)""" found = 0 try: # 获取列名行和信号列 header_row, signal_col = self.find_header_row(file_path) # 如果自动查找失败,尝试手动模式 if header_row is None or signal_col is None: self.status_var.set(f"文件 {os.path.basename(file_path)} 未找到列名行,尝试手动查找...") header_row, signal_col = self.manual_find_header_row(file_path) if header_row is None or signal_col is None: self.status_var.set(f"文件 {os.path.basename(file_path)} 无法确定列名行,已跳过") return found # 使用pandas处理所有Excel文件类型 found = self.process_file_with_pandas(file_path, search_term, header_row, signal_col) except Exception as e: self.status_var.set(f"处理文件 {os.path.basename(file_path)} 出错: {str(e)}") return found def manual_column_select(self): """手动指定列名位置(增强版)""" if not self.files: messagebox.showinfo("提示", "请先选择文件夹") return # 创建手动选择窗口 manual_window = tk.Toplevel(self.root) manual_window.title("手动指定列名位置") manual_window.geometry("500x400") # 文件选择 ttk.Label(manual_window, text="选择文件:").pack(pady=(10, 5)) file_var = tk.StringVar() file_combo = ttk.Combobox(manual_window, textvariable=file_var, values=[os.path.basename(f) for f in self.files], width=40) file_combo.pack(fill=tk.X, padx=20, pady=5) file_combo.current(0) # 预览框架 preview_frame = ttk.Frame(manual_window) preview_frame.pack(fill=tk.BOTH, expand=True, padx=10, pady=10) # 表格预览 columns = ("列", "值") self.preview_tree = ttk.Treeview(preview_frame, columns=columns, show="headings", height=10) # 设置列标题 for col in columns: self.preview_tree.heading(col, text=col) self.preview_tree.column(col, width=100, anchor=tk.W) # 添加滚动条 scrollbar = ttk.Scrollbar(preview_frame, orient=tk.VERTICAL, command=self.preview_tree.yview) self.preview_tree.configure(yscrollcommand=scrollbar.set) self.preview_tree.pack(side=tk.LEFT, fill=tk.BOTH, expand=True) scrollbar.pack(side=tk.RIGHT, fill=tk.Y) # 加载预览数据 def load_preview(event=None): file_idx = file_combo.current() file_path = self.files[file_idx] # 清空现有预览 for item in self.preview_tree.get_children(): self.preview_tree.delete(item) # 加载前10行数据 try: if file_path.lower().endswith((".xlsx", ".xlsm")): wb = openpyxl.load_workbook(file_path, read_only=True, data_only=True) ws = wb.active # 读取前10行 for row_idx in range(1, 11): for col_idx in range(1, 51): # 前50列 try: cell = ws.cell(row=row_idx, column=col_idx) if cell.value is not None: self.preview_tree.insert("", tk.END, values=( f"行{row_idx}列{col_idx}", str(cell.value)[:50] # 限制显示长度 )) except: continue wb.close() elif file_path.lower().endswith(".xls"): wb = xlrd.open_workbook(file_path) ws = wb.sheet_by_index(0) # 读取前10行 for row_idx in range(0, 10): for col_idx in range(0, 50): # 前50列 try: cell_value = ws.cell_value(row_idx, col_idx) if cell_value: self.preview_tree.insert("", tk.END, values=( f"行{row_idx+1}列{col_idx+1}", str(cell_value)[:50] # 限制显示长度 )) except: continue except Exception as e: messagebox.showerror("错误", f"加载预览失败: {str(e)}") file_combo.bind("<<ComboboxSelected>>", load_preview) load_preview() # 初始加载 # 输入框架 input_frame = ttk.Frame(manual_window) input_frame.pack(fill=tk.X, padx=20, pady=10) # 行号输入 ttk.Label(input_frame, text="列名行号:").grid(row=0, column=0, sticky=tk.W) row_var = tk.StringVar(value="1") row_entry = ttk.Entry(input_frame, textvariable=row_var, width=10) row_entry.grid(row=0, column=1, padx=5) # 列号输入 ttk.Label(input_frame, text="信号列号:").grid(row=0, column=2, sticky=tk.W, padx=(10,0)) col_var = tk.StringVar(value="1") col_entry = ttk.Entry(input_frame, textvariable=col_var, width=10) col_entry.grid(row=0, column=3, padx=5) # 确认按钮 def confirm_selection(): try: file_idx = file_combo.current() file_path = self.files[file_idx] header_row = int(row_var.get()) signal_col = int(col_var.get()) # 保存到缓存 header_info = {"header_row": header_row, "signal_col": signal_col} self.save_header_cache(file_path, header_info) messagebox.showinfo("成功", f"已为 {os.path.basename(file_path)} 设置列名位置:行{header_row} 列{signal_col}") manual_window.destroy() except Exception as e: messagebox.showerror("错误", f"无效输入: {str(e)}") ttk.Button(manual_window, text="确认", command=confirm_selection).pack(pady=10) def process_file_with_pandas(self, file_path, search_term, header_row, signal_col): """使用pandas高效处理Excel文件(优化版)""" found = 0 try: # 添加文件信息日志 file_size = os.path.getsize(file_path) short_name = os.path.basename(file_path) self.status_var.set(f"处理文件: {short_name} ({file_size}字节)") self.root.update_idletasks() # 使用pandas读取Excel文件 file_ext = os.path.splitext(file_path)[1].lower() engine = 'openpyxl' if file_ext in ['.xlsx', '.xlsm'] else 'xlrd' # 动态确定要读取的列范围(智能调整) # 计算最大可用列数 max_columns = self.get_max_columns(file_path) # 扩大列范围(前后10列) start_col = max(1, signal_col - 10) end_col = min(max_columns, signal_col + 10) # 确保信号列在读取范围内 if signal_col < start_col or signal_col > end_col: # 如果信号列不在范围内,调整读取范围 start_col = max(1, signal_col - 10) end_col = min(max_columns, signal_col + 10) # 计算信号列在DataFrame中的索引 signal_col_idx = signal_col - start_col # 确保索引有效 if signal_col_idx < 0 or signal_col_idx >= (end_col - start_col + 1): self.status_var.set(f"文件 {short_name}: 信号列索引计算错误") return 0 # 验证列位置 try: if file_path.lower().endswith((".xlsx", ".xlsm")): wb = openpyxl.load_workbook(file_path, read_only=True) ws = wb.active actual_col_name = ws.cell(row=header_row, column=signal_col).value wb.close() self.status_var.set(f"文件 {short_name}: 信号列 '{actual_col_name}' (位置 {signal_col})") elif file_path.lower().endswith(".xls"): wb = xlrd.open_workbook(file_path) ws = wb.sheet_by_index(0) actual_col_name = ws.cell_value(header_row, signal_col-1) self.status_var.set(f"文件 {short_name}: 信号列 '{actual_col_name}' (位置 {signal_col})") except Exception as e: self.status_var.set(f"列验证失败: {str(e)}") # 读取数据 df = pd.read_excel( file_path, engine=engine, header=header_row-1, usecols=range(start_col-1, end_col), dtype=str ) # 获取实际列名 column_names = df.columns.tolist() # 获取信号列数据(通过位置索引) if signal_col_idx < len(df.columns): signal_series = df.iloc[:, signal_col_idx] else: self.status_var.set(f"文件 {short_name}: 信号列超出范围") return 0 # 搜索匹配的信号 # 处理可能的NaN值 signal_series = signal_series.fillna('') # 更灵活的匹配逻辑 matches = df[signal_series.str.contains( re.escape(search_term), case=False, na=False, regex=True )] # 处理匹配行 for idx, row in matches.iterrows(): # 只显示有值的列 row_content = [] for col_idx, value in enumerate(row): # 跳过空值 if pd.notna(value) and str(value).strip() != '': # 使用实际列名 if col_idx < len(column_names): col_name = column_names[col_idx] else: col_name = f"列{start_col + col_idx}" row_content.append(f"{col_name}: {str(value).strip()}") row_content = "\n".join(row_content) signal_value = row.iloc[signal_col_idx] # 使用位置索引获取信号值 # 使用更唯一的复合键(包含行索引) signal_key = f"{signal_value}||{short_name}||{idx}" # 添加到结果集 self.results[signal_key] = { "signal": signal_value, "file": short_name, "content": row_content } # 添加到表格 summary = row_content[:50] + "..." if len(row_content) > 50 else row_content self.tree.insert("", tk.END, values=(signal_value, short_name, summary)) found += 1 # 每处理10行更新一次UI if found % 10 == 0: self.status_var.set(f"处理 {short_name}: 找到 {found} 个匹配") self.root.update_idletasks() # 添加完成日志 self.status_var.set(f"文件 {short_name} 处理完成: 找到 {found} 个匹配") except Exception as e: import traceback traceback.print_exc() self.status_var.set(f"处理文件 {short_name} 出错: {str(e)}") finally: # 显式释放内存 if 'df' in locals(): del df if 'matches' in locals(): del matches gc.collect() return found def get_max_columns(self, file_path): """获取Excel文件的最大列数""" try: if file_path.lower().endswith((".xlsx", ".xlsm")): wb = openpyxl.load_workbook(file_path, read_only=True) ws = wb.active max_col = ws.max_column wb.close() return max_col elif file_path.lower().endswith(".xls"): wb = xlrd.open_workbook(file_path) ws = wb.sheet_by_index(0) return ws.ncols except: return 100 # 默认值 return 100 # 默认值 def update_text_view(self): """更新文本对比视图""" # 清除现有文本区域 for widget in self.compare_container.winfo_children(): widget.destroy() if not self.results: return # 获取第一个信号作为默认显示 first_signal_key = next(iter(self.results.keys())) self.display_signal_comparison(first_signal_key) def on_table_select(self, event): """表格选择事件处理""" selected = self.tree.selection() if not selected: return item = self.tree.item(selected[0]) signal_value = item["values"][0] # 获取信号值 # 直接传递信号值给显示方法 self.display_signal_comparison(signal_value) def display_signal_comparison(self, signal_value): """显示指定信号在不同文件中的对比""" # 清除现有文本区域 for widget in self.compare_container.winfo_children(): widget.destroy() # 获取包含该信号的所有结果项 signal_items = [ (key, data) for key, data in self.results.items() if data["signal"] == signal_value ] if not signal_items: return # 按文件名排序 signal_items.sort(key=lambda x: x[1]["file"]) # 创建列框架 for i, (signal_key, signal_data) in enumerate(signal_items): col_frame = ttk.Frame(self.compare_container) col_frame.grid(row=0, column=i, sticky="nsew", padx=5, pady=5) self.compare_container.columnconfigure(i, weight=1) # 文件名标签 file_label = ttk.Label(col_frame, text=signal_data["file"], font=("Arial", 10, "bold")) file_label.pack(fill=tk.X, pady=(0, 5)) # 信号名标签 signal_label = ttk.Label(col_frame, text=signal_data["signal"], font=("Arial", 9, "italic")) signal_label.pack(fill=tk.X, pady=(0, 5)) # 文本区域 text_area = scrolledtext.ScrolledText(col_frame, wrap=tk.WORD, width=30, height=15) text_area.insert(tk.INSERT, signal_data["content"]) text_area.configure(state="disabled") text_area.pack(fill=tk.BOTH, expand=True) # 保存引用 self.text_panes[signal_key] = text_area def highlight_differences(self): """高亮显示文本差异""" if not self.text_panes: return # 获取所有行内容 all_contents = [] for text_area in self.text_panes.values(): text_area.configure(state="normal") text = text_area.get("1.0", tk.END).strip() text_area.configure(state="disabled") all_contents.append(text) # 如果所有内容相同,则不需要高亮 if len(set(all_contents)) == 1: self.status_var.set("所有文件行内容完全一致") return # 使用第一个文件作为基准 base_text = all_contents[0] # 对比并高亮差异 for i, (file, text_area) in enumerate(self.text_panes.items()): if i == 0: # 基准文件不需要处理 continue text_area.configure(state="normal") text_area.tag_configure("diff", background=self.highlight_color) # 清除之前的高亮 text_area.tag_remove("diff", "1.0", tk.END) # 获取当前文本 compare_text = text_area.get("1.0", tk.END).strip() # 使用序列匹配器查找差异 s = SequenceMatcher(None, base_text, compare_text) # 高亮差异部分 for tag in s.get_opcodes(): opcode = tag[0] start = tag[3] end = tag[4] if opcode != "equal": # 添加高亮标签 text_area.tag_add("diff", f"1.0+{start}c", f"1.0+{end}c") text_area.configure(state="disabled") self.status_var.set("差异已高亮显示") def choose_color(self): """选择高亮颜色""" color = askcolor(title="选择高亮颜色", initialcolor=self.highlight_color) if color[1]: self.highlight_color = color[1] self.color_btn.configure(bg=self.highlight_color) def export_report(self): """导出差异报告""" if not self.results: messagebox.showwarning("警告", "没有可导出的结果") return try: # 创建报告数据结构 report_data = [] for signal, files_data in self.results.items(): for file, content in files_data.items(): report_data.append({ "信号": signal, "文件": file, "行内容": content }) # 转换为DataFrame df = pd.DataFrame(report_data) # 保存到Excel save_path = filedialog.asksaveasfilename( defaultextension=".xlsx", filetypes=[("Excel文件", "*.xlsx")], title="保存差异报告" ) if save_path: df.to_excel(save_path, index=False) self.status_var.set(f"报告已保存到: {save_path}") except Exception as e: messagebox.showerror("错误", f"导出报告失败: {str(e)}") def clear_cache(self): """清除缓存""" try: for file in os.listdir(self.cache_dir): if file.endswith(".cache"): os.remove(os.path.join(self.cache_dir, file)) self.file_cache = {} self.column_cache = {} self.status_var.set("缓存已清除") except Exception as e: self.status_var.set(f"清除缓存失败: {str(e)}") def manual_column_select(self): """手动指定列名位置""" if not self.files: messagebox.showinfo("提示", "请先选择文件夹") return # 创建手动选择窗口 manual_window = tk.Toplevel(self.root) manual_window.title("手动指定列名位置") manual_window.geometry("400x300") # 文件选择 ttk.Label(manual_window, text="选择文件:").pack(pady=(10, 5)) file_var = tk.StringVar() file_combo = ttk.Combobox(manual_window, textvariable=file_var, values=[os.path.basename(f) for f in self.files]) file_combo.pack(fill=tk.X, padx=20, pady=5) file_combo.current(0) # 行号输入 ttk.Label(manual_window, text="列名行号:").pack(pady=(10, 5)) row_var = tk.StringVar(value="1") row_entry = ttk.Entry(manual_window, textvariable=row_var) row_entry.pack(fill=tk.X, padx=20, pady=5) # 列号输入 ttk.Label(manual_window, text="信号列号:").pack(pady=(10, 5)) col_var = tk.StringVar(value="1") col_entry = ttk.Entry(manual_window, textvariable=col_var) col_entry.pack(fill=tk.X, padx=20, pady=5) # 确认按钮 def confirm_selection(): try: file_idx = file_combo.current() file_path = self.files[file_idx] header_row = int(row_var.get()) signal_col = int(col_var.get()) # 保存到缓存 header_info = {"header_row": header_row, "signal_col": signal_col} self.save_header_cache(file_path, header_info) messagebox.showinfo("成功", f"已为 {os.path.basename(file_path)} 设置列名位置:行{header_row} 列{signal_col}") manual_window.destroy() except Exception as e: messagebox.showerror("错误", f"无效输入: {str(e)}") ttk.Button(manual_window, text="确认", command=confirm_selection).pack(pady=20) if __name__ == "__main__": root = tk.Tk() app = EnhancedSignalComparator(root) root.mainloop() 日志内容为:2025-07-23T13:33:19.517561 - 日志初始化 上述两个问题还是没有解决:1、文件夹内一共三个文件,每个文件中都有只有一个信号,但是实际上,901_CAN送受信値-f.xlsx中没有找到信号,在【ドラフト版】D01D-00-02(HEV車).xlsm与【ドラフト版】D01D-00-03(コンベ車).xlsx分别找到了两个信号,但实际上分别只有一个信号。 2、行内容比对中【ドラフト版】D01D-00-02(HEV車).xlsm与【ドラフト版】D01D-00-03(コンベ車).xlsx搜索到的内容相同,但实际上有不同的
时间: 2025-07-23 20:55:11 浏览: 12
<think>根据日志内容,程序已经初始化,但问题仍然存在。我们需要进一步分析问题原因并提供解决方案。
### 问题1:901_CAN送受信値-f.xlsx中没有找到信号
**可能原因**:
1. **列名行识别失败**:该文件的列名行可能不在程序搜索的前100行或前200列内,或者列名不符合预设的模式。
2. **文件格式问题**:该文件可能是二进制格式(.xlsb)或加密文件,导致无法读取。
3. **信号列位置特殊**:信号列位置可能不在程序读取的范围内(前后10列)。
4. **信号值不匹配**:搜索的信号在该文件中可能不存在或格式不一致(如全角/半角、空格等)。
**解决方案**:
1. **手动指定列名位置**:使用程序中的手动指定功能,为该文件指定列名行和信号列。
2. **扩大搜索范围**:修改`find_header_row_openpyxl`和`find_header_row_xlrd`方法,增加搜索行数和列数。
3. **增加列名模式**:在`patterns`列表中添加更多可能的列名模式,比如该文件可能使用的特定列名。
4. **调试日志**:在日志中记录每个文件的处理详情,包括读取的列范围、匹配到的信号值等。
### 问题2:两个文件搜索到的内容相同,但实际上有不同
**可能原因**:
1. **行索引相同导致键重复**:虽然文件不同,但行索引相同且信号值相同,导致结果被覆盖。
2. **读取的列范围不足**:只读取了信号列前后10列,可能未包含有差异的列。
3. **数据清洗问题**:在生成行内容时,可能因为数据类型(如数字、日期)导致显示相同。
4. **信号值匹配不精确**:匹配时使用了`re.escape`和`case=False`,可能匹配到多个信号。
**解决方案**:
1. **确保键的唯一性**:在创建`signal_key`时,加入文件路径和行索引,确保键的唯一性。
2. **扩大列范围**:将列范围扩大到前后20列,确保包含差异列。
3. **精确数据类型**:在读取Excel时,不指定`dtype=str`,保留原始数据类型,并在生成行内容时进行类型转换。
4. **精确匹配**:在搜索信号时,使用更精确的匹配方式(如完全匹配或正则表达式锚定)。
### 具体代码修改
#### 1. 扩大列名搜索范围(针对问题1)
在`find_header_row_openpyxl`方法中,将搜索范围扩大到前200行和前300列:
```python
def find_header_row_openpyxl(self, file_path):
try:
self.log(f"开始处理文件: {os.path.basename(file_path)}")
wb = openpyxl.load_workbook(file_path, read_only=True, data_only=True)
ws = wb.active
# 扩大搜索范围:前200行和前300列
for row_idx in range(1, 201): # 修改为200行
for col_idx in range(1, 301): # 修改为300列
# ... 原有代码 ...
```
#### 2. 增加列名模式(针对问题1)
在`patterns`列表中添加更多可能的列名:
```python
patterns = [
r'データ名', r'データ名', r'信号名', r'Signal Name',
r'Data Name', r'信号名称', r'データ名称', r'信号',
r'データー名', r'DataItem', r'Signal' # 新增模式
]
```
#### 3. 扩大读取列范围(针对问题2)
在`process_file_with_pandas`方法中,扩大读取列范围到前后20列:
```python
# 扩大列范围(前后20列)
start_col = max(1, signal_col - 20)
end_col = min(max_columns, signal_col + 20)
```
#### 4. 确保键的唯一性(针对问题2)
在`process_file_with_pandas`方法中,修改`signal_key`的生成方式,加入文件完整路径和行索引:
```python
# 使用文件路径和行索引确保唯一性
signal_key = f"{file_path}||{idx}||{signal_value}"
```
#### 5. 保留原始数据类型(针对问题2)
在读取Excel时,不指定`dtype=str`,这样pandas会尝试保留原始数据类型。然后在生成行内容时,将值转换为字符串:
```python
# 读取数据时不指定dtype
df = pd.read_excel(
file_path,
engine=engine,
header=header_row-1,
usecols=range(start_col-1, end_col)
# 移除 dtype=str
)
# ... 在生成行内容时 ...
for col_idx, value in enumerate(row):
# 将值转换为字符串,并处理NaN
str_value = str(value) if not pd.isna(value) else ""
if str_value.strip() != '':
# ... 使用str_value ...
```
#### 6. 精确匹配信号(针对问题2)
在匹配信号时,使用锚定正则表达式确保完全匹配(如果需要):
```python
# 如果希望完全匹配,使用正则锚定
pattern = f"^{re.escape(search_term)}$"
matches = df[signal_series.str.contains(
pattern,
case=False,
na=False,
regex=True
)]
```
### 完整修改后的`process_file_with_pandas`方法
```python
def process_file_with_pandas(self, file_path, search_term, header_row, signal_col):
"""使用pandas高效处理Excel文件(优化版)"""
found = 0
try:
# 添加文件信息日志
file_size = os.path.getsize(file_path)
short_name = os.path.basename(file_path)
self.log(f"处理文件: {short_name} ({file_size}字节)")
self.root.update_idletasks()
# 使用pandas读取Excel文件
file_ext = os.path.splitext(file_path)[1].lower()
engine = 'openpyxl' if file_ext in ['.xlsx', '.xlsm'] else 'xlrd'
# 动态确定要读取的列范围(智能调整)
# 计算最大可用列数
max_columns = self.get_max_columns(file_path)
# 扩大列范围(前后20列)
start_col = max(1, signal_col - 20)
end_col = min(max_columns, signal_col + 20)
# 确保信号列在读取范围内
if signal_col < start_col or signal_col > end_col:
# 如果信号列不在范围内,调整读取范围
start_col = max(1, signal_col - 20)
end_col = min(max_columns, signal_col + 20)
# 计算信号列在DataFrame中的索引
signal_col_idx = signal_col - start_col
# 确保索引有效
if signal_col_idx < 0 or signal_col_idx >= (end_col - start_col + 1):
self.log(f"文件 {short_name}: 信号列索引计算错误")
return 0
# 验证列位置
try:
if file_path.lower().endswith((".xlsx", ".xlsm")):
wb = openpyxl.load_workbook(file_path, read_only=True)
ws = wb.active
actual_col_name = ws.cell(row=header_row, column=signal_col).value
wb.close()
self.log(f"文件 {short_name}: 信号列 '{actual_col_name}' (位置 {signal_col})")
elif file_path.lower().endswith(".xls"):
wb = xlrd.open_workbook(file_path)
ws = wb.sheet_by_index(0)
actual_col_name = ws.cell_value(header_row, signal_col-1)
self.log(f"文件 {short_name}: 信号列 '{actual_col_name}' (位置 {signal_col})")
except Exception as e:
self.log(f"列验证失败: {str(e)}")
# 读取数据(不指定dtype以保留原始类型)
df = pd.read_excel(
file_path,
engine=engine,
header=header_row-1,
usecols=range(start_col-1, end_col)
)
# 获取实际列名
column_names = df.columns.tolist()
# 获取信号列数据(通过位置索引)
if signal_col_idx < len(df.columns):
signal_series = df.iloc[:, signal_col_idx]
else:
self.log(f"文件 {short_name}: 信号列超出范围")
return 0
# 搜索匹配的信号
# 处理可能的NaN值
signal_series = signal_series.fillna('')
# 更灵活的匹配逻辑(使用正则表达式锚定进行精确匹配)
pattern = f"^{re.escape(search_term)}$"
matches = df[signal_series.str.contains(
pattern,
case=False,
na=False,
regex=True
)]
# 处理匹配行
for idx, row in matches.iterrows():
# 只显示有值的列
row_content = []
for col_idx, value in enumerate(row):
# 将值转换为字符串,并跳过空值
if pd.isna(value):
continue
str_value = str(value).strip()
if str_value != '':
# 使用实际列名
if col_idx < len(column_names):
col_name = column_names[col_idx]
else:
col_name = f"列{start_col + col_idx}"
row_content.append(f"{col_name}: {str_value}")
row_content = "\n".join(row_content)
signal_value = row.iloc[signal_col_idx] # 使用位置索引获取信号值
if not isinstance(signal_value, str):
signal_value = str(signal_value)
# 使用文件路径和行索引确保键的唯一性
signal_key = f"{file_path}||{idx}||{signal_value}"
# 添加到结果集
self.results[signal_key] = {
"signal": signal_value,
"file": short_name,
"content": row_content
}
# 添加到表格
summary = row_content[:50] + "..." if len(row_content) > 50 else row_content
self.tree.insert("", tk.END, values=(signal_value, short_name, summary))
found += 1
# 每处理10行更新一次UI
if found % 10 == 0:
self.log(f"处理 {short_name}: 找到 {found} 个匹配")
self.root.update_idletasks()
# 添加完成日志
self.log(f"文件 {short_name} 处理完成: 找到 {found} 个匹配")
except Exception as e:
import traceback
self.log(f"处理文件 {short_name} 出错: {str(e)}")
traceback.print_exc()
finally:
# 显式释放内存
if 'df' in locals():
del df
if 'matches' in locals():
del matches
gc.collect()
return found
```
### 调试建议
1. **检查日志文件**:查看每个文件的处理日志,特别是901_CAN送受信値-f.xlsx的日志,看是否有列名行识别失败的记录。
2. **手动指定列位置**:对901_CAN送受信値-f.xlsx使用手动指定功能,确保正确指定列名行和信号列。
3. **检查文件内容**:手动打开文件,确认信号是否存在,以及列名行位置。
4. **比较实际内容**:对于两个显示相同但实际上不同的文件,检查日志中记录的行内容,与文件实际内容对比。
### 相关问题
§§相关问题§§:
1. 如何处理Excel文件中的合并单元格?
2. 如何优化大型Excel文件的读取性能?
3. 如何实现Excel文件的差异对比和合并?
4. 如何处理Excel文件中的多语言编码问题?</think>### 问题分析与解决方案
#### 问题1:901_CAN送受信値-f.xlsx未找到信号
**原因分析**:
1. **列名行识别失败**:该文件可能使用非标准列名格式
2. **信号列位置特殊**:信号列可能不在自动检测的范围内
3. **文件格式问题**:可能是特殊格式的Excel文件(如加密或宏启用)
4. **搜索词匹配问题**:信号值可能不完全匹配搜索词
**解决方案**:
1. 添加详细的调试日志
2. 改进列名行识别逻辑
3. 添加信号值精确匹配选项
```python
def process_file_with_pandas(self, file_path, search_term, header_row, signal_col):
"""使用pandas高效处理Excel文件(优化版)"""
found = 0
try:
# ... [原有代码] ...
# 精确匹配模式开关
exact_match = False # 可设置为用户可配置选项
# 搜索匹配的信号
signal_series = signal_series.fillna('')
if exact_match:
# 精确匹配模式
matches = df[signal_series.str.strip().str.lower() == search_term.lower().strip()]
else:
# 模糊匹配模式
matches = df[signal_series.str.contains(
re.escape(search_term),
case=False,
na=False,
regex=True
)]
# 添加匹配结果日志
self.log(f"文件 {short_name} 找到 {len(matches)} 个匹配")
for idx, row in matches.iterrows():
signal_value = row.iloc[signal_col_idx]
self.log(f"匹配行 {idx}: 信号值='{signal_value}'")
# ... [后续代码] ...
```
#### 问题2:不同文件显示相同内容
**原因分析**:
1. **结果覆盖问题**:相同信号值在不同文件中的行被覆盖
2. **行索引未包含**:未考虑行索引导致相同信号值的不同行被合并
3. **数据读取范围不足**:读取的列范围不够导致显示相同
**解决方案**:
1. 使用更唯一的复合键
2. 确保包含行索引和文件路径
3. 扩大数据读取范围
```python
def process_file_with_pandas(self, file_path, search_term, header_row, signal_col):
"""使用pandas高效处理Excel文件(优化版)"""
found = 0
try:
# ... [原有代码] ...
# 处理匹配行
for idx, row in matches.iterrows():
# ... [行内容生成] ...
# 使用唯一复合键(文件路径+行索引+信号值)
signal_key = f"{file_path}||{idx}||{signal_value}"
# 添加到结果集
self.results[signal_key] = {
"signal": signal_value,
"file": short_name,
"content": row_content,
"file_path": file_path, # 保存完整路径
"row_idx": idx # 保存行索引
}
# ... [后续代码] ...
```
### 完整优化方案
#### 1. 增强日志系统
```python
def process_file(self, file_path, search_term):
"""处理单个文件(增强异常处理和调试)"""
found = 0
short_name = os.path.basename(file_path)
try:
# 获取列名行和信号列
header_row, signal_col = self.find_header_row(file_path)
self.log(f"文件 {short_name}: 自动查找结果 - 列名行: {header_row}, 信号列: {signal_col}")
# 如果自动查找失败,尝试手动模式
if header_row is None or signal_col is None:
self.log(f"文件 {short_name} 未找到列名行,尝试手动查找...")
header_row, signal_col = self.manual_find_header_row(file_path)
self.log(f"文件 {short_name}: 手动查找结果 - 列名行: {header_row}, 信号列: {signal_col}")
if header_row is None or signal_col is None:
self.log(f"文件 {short_name} 无法确定列名行,已跳过")
return found
# 使用pandas处理
found = self.process_file_with_pandas(file_path, search_term, header_row, signal_col)
self.log(f"文件 {short_name} 处理完成,找到 {found} 个匹配")
except Exception as e:
error_msg = f"处理文件 {short_name} 出错: {str(e)}"
self.log(error_msg)
import traceback
traceback.print_exc()
return found
```
#### 2. 改进列名行识别逻辑
```python
def find_header_row_openpyxl(self, file_path):
"""使用openpyxl查找列名行(增强版)"""
try:
self.log(f"开始处理文件: {os.path.basename(file_path)}")
wb = openpyxl.load_workbook(file_path, read_only=True, data_only=True)
ws = wb.active
# 尝试多种列名匹配模式
patterns = [
r'データ名', r'データ名', r'信号名', r'Signal Name',
r'Data Name', r'信号名称', r'データ名称', r'信号',
r'データー名', r'DataItem', r'Signal' # 新增模式
]
# 扩大搜索范围:前100行和前200列
for row_idx in range(1, 101):
for col_idx in range(1, 201):
try:
cell = ws.cell(row=row_idx, column=col_idx)
cell_value = cell.value
if cell_value is None:
continue
cell_str = str(cell_value)
# 记录前10行前10列的值用于调试
if row_idx <= 10 and col_idx <= 10:
self.log(f"行{row_idx}列{col_idx}: '{cell_str}'")
for pattern in patterns:
if re.search(pattern, cell_str, re.IGNORECASE):
self.log(f"找到匹配模式 '{pattern}' 在行{row_idx}列{col_idx}")
# ... [后续逻辑] ...
except:
continue
# ... [其他代码] ...
except Exception as e:
self.log(f"查找列名行出错: {str(e)}")
return None, None
```
#### 3. 添加精确匹配选项到UI
```python
def create_widgets(self):
# ... [原有代码] ...
# 精确匹配选项
self.exact_match_var = tk.BooleanVar(value=False)
exact_match_check = ttk.Checkbutton(
control_frame,
text="精确匹配",
variable=self.exact_match_var
)
exact_match_check.grid(row=2, column=4, sticky=tk.W, padx=5, pady=(10,0))
```
#### 4. 在文本对比视图中显示完整差异
```python
def display_signal_comparison(self, signal_value):
"""显示指定信号在不同文件中的对比"""
# ... [原有代码] ...
# 创建列框架
for i, (signal_key, signal_data) in enumerate(signal_items):
# ... [原有代码] ...
# 添加"查看完整内容"按钮
btn_frame = ttk.Frame(col_frame)
btn_frame.pack(fill=tk.X, pady=(5, 0))
ttk.Button(
btn_frame,
text="查看完整内容",
command=lambda f=signal_data["file_path"], r=signal_data["row_idx"]: self.show_full_content(f, r)
).pack(side=tk.LEFT)
def show_full_content(self, file_path, row_idx):
"""显示行的完整内容"""
# 实现完整内容显示逻辑
self.log(f"显示完整内容: 文件={os.path.basename(file_path)}, 行={row_idx}")
# [具体实现代码]
```
### 关键优化点总结
1. **增强日志系统**:
- 记录每个文件处理的关键步骤
- 输出前10行前10列的值帮助调试
- 记录匹配结果详情
2. **改进列名识别**:
- 增加更多匹配模式
- 扩大搜索范围(行1-100,列1-200)
- 输出前10行前10列的值
3. **结果唯一性保证**:
- 使用`文件路径+行索引+信号值`作为复合键
- 避免相同信号值在不同文件中的覆盖
4. **精确匹配选项**:
- 添加UI复选框供用户选择
- 实现精确匹配和模糊匹配两种模式
### 调试建议
1. **检查日志文件**:
- 查看901_CAN送受信値-f.xlsx的处理日志
- 分析前10行前10列的值,确定列名位置
2. **手动指定列位置**:
- 对问题文件使用手动指定功能
- 根据日志中的前10行值确定正确位置
3. **验证文件内容**:
- 手动打开问题文件检查信号是否存在
- 确认信号值与搜索词是否完全匹配
4. **使用精确匹配模式**:
- 在UI中启用"精确匹配"选项
- 确保信号值完全匹配搜索词
### 相关问题
阅读全文
相关推荐













